diff options
-rw-r--r-- | meta/lib/oeqa/selftest/cases/runcmd.py | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/meta/lib/oeqa/selftest/cases/runcmd.py b/meta/lib/oeqa/selftest/cases/runcmd.py new file mode 100644 index 0000000000..6c785caadc --- /dev/null +++ b/meta/lib/oeqa/selftest/cases/runcmd.py | |||
@@ -0,0 +1,117 @@ | |||
1 | from oeqa.selftest.case import OESelftestTestCase | ||
2 | from oeqa.utils.commands import runCmd | ||
3 | from oeqa.utils import CommandError | ||
4 | |||
5 | import subprocess | ||
6 | import threading | ||
7 | import time | ||
8 | import signal | ||
9 | |||
10 | class MemLogger(object): | ||
11 | def __init__(self): | ||
12 | self.info_msgs = [] | ||
13 | self.error_msgs = [] | ||
14 | |||
15 | def info(self, msg): | ||
16 | self.info_msgs.append(msg) | ||
17 | |||
18 | def error(self, msg): | ||
19 | self.error_msgs.append(msg) | ||
20 | |||
21 | class RunCmdTests(OESelftestTestCase): | ||
22 | """ Basic tests for runCmd() utility function """ | ||
23 | |||
24 | # The delta is intentionally smaller than the timeout, to detect cases where | ||
25 | # we incorrectly apply the timeout more than once. | ||
26 | TIMEOUT = 2 | ||
27 | DELTA = 1 | ||
28 | |||
29 | def test_result_okay(self): | ||
30 | result = runCmd("true") | ||
31 | self.assertEqual(result.status, 0) | ||
32 | |||
33 | def test_result_false(self): | ||
34 | result = runCmd("false", ignore_status=True) | ||
35 | self.assertEqual(result.status, 1) | ||
36 | |||
37 | def test_shell(self): | ||
38 | # A shell is used for all string commands. | ||
39 | result = runCmd("false; true", ignore_status=True) | ||
40 | self.assertEqual(result.status, 0) | ||
41 | |||
42 | def test_no_shell(self): | ||
43 | self.assertRaises(FileNotFoundError, | ||
44 | runCmd, "false; true", shell=False) | ||
45 | |||
46 | def test_list_not_found(self): | ||
47 | self.assertRaises(FileNotFoundError, | ||
48 | runCmd, ["false; true"]) | ||
49 | |||
50 | def test_list_okay(self): | ||
51 | result = runCmd(["true"]) | ||
52 | self.assertEqual(result.status, 0) | ||
53 | |||
54 | def test_result_assertion(self): | ||
55 | self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar", | ||
56 | runCmd, "echo foobar >&2; false", shell=True) | ||
57 | |||
58 | def test_result_exception(self): | ||
59 | self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar", | ||
60 | runCmd, "echo foobar >&2; false", shell=True, assert_error=False) | ||
61 | |||
62 | def test_output(self): | ||
63 | result = runCmd("echo stdout; echo stderr >&2", shell=True) | ||
64 | self.assertEqual("stdout\nstderr", result.output) | ||
65 | self.assertEqual("", result.error) | ||
66 | |||
67 | def test_output_split(self): | ||
68 | result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE) | ||
69 | self.assertEqual("stdout", result.output) | ||
70 | self.assertEqual("stderr", result.error) | ||
71 | |||
72 | def test_timeout(self): | ||
73 | numthreads = threading.active_count() | ||
74 | start = time.time() | ||
75 | # Killing a hanging process only works when not using a shell?! | ||
76 | result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True) | ||
77 | self.assertEqual(result.status, -signal.SIGTERM) | ||
78 | end = time.time() | ||
79 | self.assertLess(end - start, self.TIMEOUT + self.DELTA) | ||
80 | self.assertEqual(numthreads, threading.active_count()) | ||
81 | |||
82 | def test_timeout_split(self): | ||
83 | numthreads = threading.active_count() | ||
84 | start = time.time() | ||
85 | # Killing a hanging process only works when not using a shell?! | ||
86 | result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE) | ||
87 | self.assertEqual(result.status, -signal.SIGTERM) | ||
88 | end = time.time() | ||
89 | self.assertLess(end - start, self.TIMEOUT + self.DELTA) | ||
90 | self.assertEqual(numthreads, threading.active_count()) | ||
91 | |||
92 | def test_stdin(self): | ||
93 | numthreads = threading.active_count() | ||
94 | result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT) | ||
95 | self.assertEqual("hello world", result.output) | ||
96 | self.assertEqual(numthreads, threading.active_count()) | ||
97 | |||
98 | def test_stdin_timeout(self): | ||
99 | numthreads = threading.active_count() | ||
100 | start = time.time() | ||
101 | result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True) | ||
102 | self.assertEqual(result.status, -signal.SIGTERM) | ||
103 | end = time.time() | ||
104 | self.assertLess(end - start, self.TIMEOUT + self.DELTA) | ||
105 | self.assertEqual(numthreads, threading.active_count()) | ||
106 | |||
107 | def test_log(self): | ||
108 | log = MemLogger() | ||
109 | result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log) | ||
110 | self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs) | ||
111 | self.assertEqual([], log.error_msgs) | ||
112 | |||
113 | def test_log_split(self): | ||
114 | log = MemLogger() | ||
115 | result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE) | ||
116 | self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs) | ||
117 | self.assertEqual(["stderr"], log.error_msgs) | ||