summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/selftest/cases/runcmd.py
blob: 6c785caadc348355d9c81c2cb211a756c56b7934 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from oeqa.selftest.case import OESelftestTestCase
from oeqa.utils.commands import runCmd
from oeqa.utils import CommandError

import subprocess
import threading
import time
import signal

class MemLogger(object):
    def __init__(self):
        self.info_msgs = []
        self.error_msgs = []

    def info(self, msg):
        self.info_msgs.append(msg)

    def error(self, msg):
        self.error_msgs.append(msg)

class RunCmdTests(OESelftestTestCase):
    """ Basic tests for runCmd() utility function """

    # The delta is intentionally smaller than the timeout, to detect cases where
    # we incorrectly apply the timeout more than once.
    TIMEOUT = 2
    DELTA = 1

    def test_result_okay(self):
        result = runCmd("true")
        self.assertEqual(result.status, 0)

    def test_result_false(self):
        result = runCmd("false", ignore_status=True)
        self.assertEqual(result.status, 1)

    def test_shell(self):
        # A shell is used for all string commands.
        result = runCmd("false; true", ignore_status=True)
        self.assertEqual(result.status, 0)

    def test_no_shell(self):
        self.assertRaises(FileNotFoundError,
                          runCmd, "false; true", shell=False)

    def test_list_not_found(self):
        self.assertRaises(FileNotFoundError,
                          runCmd, ["false; true"])

    def test_list_okay(self):
        result = runCmd(["true"])
        self.assertEqual(result.status, 0)

    def test_result_assertion(self):
        self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status 1:\nfoobar",
                                runCmd, "echo foobar >&2; false", shell=True)

    def test_result_exception(self):
        self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with output: foobar",
                                runCmd, "echo foobar >&2; false", shell=True, assert_error=False)

    def test_output(self):
        result = runCmd("echo stdout; echo stderr >&2", shell=True)
        self.assertEqual("stdout\nstderr", result.output)
        self.assertEqual("", result.error)

    def test_output_split(self):
        result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
        self.assertEqual("stdout", result.output)
        self.assertEqual("stderr", result.error)

    def test_timeout(self):
        numthreads = threading.active_count()
        start = time.time()
        # Killing a hanging process only works when not using a shell?!
        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count())

    def test_timeout_split(self):
        numthreads = threading.active_count()
        start = time.time()
        # Killing a hanging process only works when not using a shell?!
        result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count())

    def test_stdin(self):
        numthreads = threading.active_count()
        result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
        self.assertEqual("hello world", result.output)
        self.assertEqual(numthreads, threading.active_count())

    def test_stdin_timeout(self):
        numthreads = threading.active_count()
        start = time.time()
        result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
        self.assertEqual(result.status, -signal.SIGTERM)
        end = time.time()
        self.assertLess(end - start, self.TIMEOUT + self.DELTA)
        self.assertEqual(numthreads, threading.active_count())

    def test_log(self):
        log = MemLogger()
        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
        self.assertEqual([], log.error_msgs)

    def test_log_split(self):
        log = MemLogger()
        result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
        self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
        self.assertEqual(["stderr"], log.error_msgs)