summaryrefslogtreecommitdiffstats
path: root/meta
diff options
context:
space:
mode:
authorPatrick Ohly <patrick.ohly@intel.com>2017-06-27 13:03:41 +0200
committerRichard Purdie <richard.purdie@linuxfoundation.org>2017-06-28 20:55:09 +0100
commit8b6fde28e9fe7503f6946500e5b4ee30664501b9 (patch)
treedd5f2664449018fed5d62d364e267e427542aa1a /meta
parentea34f20a06e3c5f2912fca0e71e29b17a1d9df59 (diff)
downloadpoky-8b6fde28e9fe7503f6946500e5b4ee30664501b9.tar.gz
runcmd.py: unit testing for runCmd()
This covers the traditional API as well as the new output_log feature. While testing, it was noticed that killing hanging commands does not work when a shell is used to run the command(s). This might be worth fixing. (From OE-Core rev: 62489e58ca9975f58b48fc2bd8cf27fd22e25564) Signed-off-by: Patrick Ohly <patrick.ohly@intel.com> Signed-off-by: Ross Burton <ross.burton@intel.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta')
-rw-r--r--meta/lib/oeqa/selftest/cases/runcmd.py117
1 files changed, 117 insertions, 0 deletions
diff --git a/meta/lib/oeqa/selftest/cases/runcmd.py b/meta/lib/oeqa/selftest/cases/runcmd.py
new file mode 100644
index 0000000000..6c785caadc
--- /dev/null
+++ b/meta/lib/oeqa/selftest/cases/runcmd.py
@@ -0,0 +1,117 @@
1from oeqa.selftest.case import OESelftestTestCase
2from oeqa.utils.commands import runCmd
3from oeqa.utils import CommandError
4
5import subprocess
6import threading
7import time
8import signal
9
10class MemLogger(object):
11 def __init__(self):
12 self.info_msgs = []
13 self.error_msgs = []
14
15 def info(self, msg):
16 self.info_msgs.append(msg)
17
18 def error(self, msg):
19 self.error_msgs.append(msg)
20
21class RunCmdTests(OESelftestTestCase):
22 """ Basic tests for runCmd() utility function """
23
24 # The delta is intentionally smaller than the timeout, to detect cases where
25 # we incorrectly apply the timeout more than once.
26 TIMEOUT = 2
27 DELTA = 1
28
29 def test_result_okay(self):
30 result = runCmd("true")
31 self.assertEqual(result.status, 0)
32
33 def test_result_false(self):
34 result = runCmd("false", ignore_status=True)
35 self.assertEqual(result.status, 1)
36
37 def test_shell(self):
38 # A shell is used for all string commands.
39 result = runCmd("false; true", ignore_status=True)
40 self.assertEqual(result.status, 0)
41
42 def test_no_shell(self):
43 self.assertRaises(FileNotFoundError,
44 runCmd, "false; true", shell=False)
45
46 def test_list_not_found(self):
47 self.assertRaises(FileNotFoundError,
48 runCmd, ["false; true"])
49
50 def test_list_okay(self):
51 result = runCmd(["true"])
52 self.assertEqual(result.status, 0)
53
54 def test_result_assertion(self):
55 self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
56 runCmd, "echo foobar >&2; false", shell=True)
57
58 def test_result_exception(self):
59 self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
60 runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
61
62 def test_output(self):
63 result = runCmd("echo stdout; echo stderr >&2", shell=True)
64 self.assertEqual("stdout\nstderr", result.output)
65 self.assertEqual("", result.error)
66
67 def test_output_split(self):
68 result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
69 self.assertEqual("stdout", result.output)
70 self.assertEqual("stderr", result.error)
71
72 def test_timeout(self):
73 numthreads = threading.active_count()
74 start = time.time()
75 # Killing a hanging process only works when not using a shell?!
76 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
77 self.assertEqual(result.status, -signal.SIGTERM)
78 end = time.time()
79 self.assertLess(end - start, self.TIMEOUT + self.DELTA)
80 self.assertEqual(numthreads, threading.active_count())
81
82 def test_timeout_split(self):
83 numthreads = threading.active_count()
84 start = time.time()
85 # Killing a hanging process only works when not using a shell?!
86 result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
87 self.assertEqual(result.status, -signal.SIGTERM)
88 end = time.time()
89 self.assertLess(end - start, self.TIMEOUT + self.DELTA)
90 self.assertEqual(numthreads, threading.active_count())
91
92 def test_stdin(self):
93 numthreads = threading.active_count()
94 result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
95 self.assertEqual("hello world", result.output)
96 self.assertEqual(numthreads, threading.active_count())
97
98 def test_stdin_timeout(self):
99 numthreads = threading.active_count()
100 start = time.time()
101 result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
102 self.assertEqual(result.status, -signal.SIGTERM)
103 end = time.time()
104 self.assertLess(end - start, self.TIMEOUT + self.DELTA)
105 self.assertEqual(numthreads, threading.active_count())
106
107 def test_log(self):
108 log = MemLogger()
109 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
110 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
111 self.assertEqual([], log.error_msgs)
112
113 def test_log_split(self):
114 log = MemLogger()
115 result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
116 self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
117 self.assertEqual(["stderr"], log.error_msgs)