summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--meta/lib/oeqa/utils/commands.py107
1 files changed, 85 insertions, 22 deletions
diff --git a/meta/lib/oeqa/utils/commands.py b/meta/lib/oeqa/utils/commands.py
index 57286fcb10..5e5345434d 100644
--- a/meta/lib/oeqa/utils/commands.py
+++ b/meta/lib/oeqa/utils/commands.py
@@ -13,6 +13,7 @@ import sys
13import signal 13import signal
14import subprocess 14import subprocess
15import threading 15import threading
16import time
16import logging 17import logging
17from oeqa.utils import CommandError 18from oeqa.utils import CommandError
18from oeqa.utils import ftools 19from oeqa.utils import ftools
@@ -25,7 +26,7 @@ except ImportError:
25 pass 26 pass
26 27
27class Command(object): 28class Command(object):
28 def __init__(self, command, bg=False, timeout=None, data=None, **options): 29 def __init__(self, command, bg=False, timeout=None, data=None, output_log=None, **options):
29 30
30 self.defaultopts = { 31 self.defaultopts = {
31 "stdout": subprocess.PIPE, 32 "stdout": subprocess.PIPE,
@@ -48,41 +49,103 @@ class Command(object):
48 self.options.update(options) 49 self.options.update(options)
49 50
50 self.status = None 51 self.status = None
52 # We collect chunks of output before joining them at the end.
53 self._output_chunks = []
54 self._error_chunks = []
51 self.output = None 55 self.output = None
52 self.error = None 56 self.error = None
53 self.thread = None 57 self.threads = []
54 58
59 self.output_log = output_log
55 self.log = logging.getLogger("utils.commands") 60 self.log = logging.getLogger("utils.commands")
56 61
57 def run(self): 62 def run(self):
58 self.process = subprocess.Popen(self.cmd, **self.options) 63 self.process = subprocess.Popen(self.cmd, **self.options)
59 64
60 def commThread(): 65 def readThread(output, stream, logfunc):
61 self.output, self.error = self.process.communicate(self.data) 66 if logfunc:
62 67 for line in stream:
63 self.thread = threading.Thread(target=commThread) 68 output.append(line)
64 self.thread.start() 69 logfunc(line.decode("utf-8", errors='replace').rstrip())
70 else:
71 output.append(stream.read())
72
73 def readStderrThread():
74 readThread(self._error_chunks, self.process.stderr, self.output_log.error if self.output_log else None)
75
76 def readStdoutThread():
77 readThread(self._output_chunks, self.process.stdout, self.output_log.info if self.output_log else None)
78
79 def writeThread():
80 try:
81 self.process.stdin.write(self.data)
82 self.process.stdin.close()
83 except OSError as ex:
84 # It's not an error when the command does not consume all
85 # of our data. subprocess.communicate() also ignores that.
86 if ex.errno != EPIPE:
87 raise
88
89 # We write in a separate thread because then we can read
90 # without worrying about deadlocks. The additional thread is
91 # expected to terminate by itself and we mark it as a daemon,
92 # so even it should happen to not terminate for whatever
93 # reason, the main process will still exit, which will then
94 # kill the write thread.
95 if self.data:
96 threading.Thread(target=writeThread, daemon=True).start()
97 if self.process.stderr:
98 thread = threading.Thread(target=readStderrThread)
99 thread.start()
100 self.threads.append(thread)
101 if self.output_log:
102 self.output_log.info('Running: %s' % self.cmd)
103 thread = threading.Thread(target=readStdoutThread)
104 thread.start()
105 self.threads.append(thread)
65 106
66 self.log.debug("Running command '%s'" % self.cmd) 107 self.log.debug("Running command '%s'" % self.cmd)
67 108
68 if not self.bg: 109 if not self.bg:
69 self.thread.join(self.timeout) 110 if self.timeout is None:
111 for thread in self.threads:
112 thread.join()
113 else:
114 deadline = time.time() + self.timeout
115 for thread in self.threads:
116 timeout = deadline - time.time()
117 if timeout < 0:
118 timeout = 0
119 thread.join(timeout)
70 self.stop() 120 self.stop()
71 121
72 def stop(self): 122 def stop(self):
73 if self.thread.isAlive(): 123 for thread in self.threads:
74 self.process.terminate() 124 if thread.isAlive():
125 self.process.terminate()
75 # let's give it more time to terminate gracefully before killing it 126 # let's give it more time to terminate gracefully before killing it
76 self.thread.join(5) 127 thread.join(5)
77 if self.thread.isAlive(): 128 if thread.isAlive():
78 self.process.kill() 129 self.process.kill()
79 self.thread.join() 130 thread.join()
80 131
81 if not self.output: 132 def finalize_output(data):
82 self.output = "" 133 if not data:
83 else: 134 data = ""
84 self.output = self.output.decode("utf-8", errors='replace').rstrip() 135 else:
85 self.status = self.process.poll() 136 data = b"".join(data)
137 data = data.decode("utf-8", errors='replace').rstrip()
138 return data
139
140 self.output = finalize_output(self._output_chunks)
141 self._output_chunks = None
142 # self.error used to be a byte string earlier, probably unintentionally.
143 # Now it is a normal string, just like self.output.
144 self.error = finalize_output(self._error_chunks)
145 self._error_chunks = None
146 # At this point we know that the process has closed stdout/stderr, so
147 # it is safe and necessary to wait for the actual process completion.
148 self.status = self.process.wait()
86 149
87 self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status)) 150 self.log.debug("Command '%s' returned %d as exit code." % (self.cmd, self.status))
88 # logging the complete output is insane 151 # logging the complete output is insane
@@ -98,7 +161,7 @@ class Result(object):
98 161
99 162
100def runCmd(command, ignore_status=False, timeout=None, assert_error=True, 163def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
101 native_sysroot=None, limit_exc_output=0, **options): 164 native_sysroot=None, limit_exc_output=0, output_log=None, **options):
102 result = Result() 165 result = Result()
103 166
104 if native_sysroot: 167 if native_sysroot:
@@ -108,7 +171,7 @@ def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
108 nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '') 171 nenv['PATH'] = extra_paths + ':' + nenv.get('PATH', '')
109 options['env'] = nenv 172 options['env'] = nenv
110 173
111 cmd = Command(command, timeout=timeout, **options) 174 cmd = Command(command, timeout=timeout, output_log=output_log, **options)
112 cmd.run() 175 cmd.run()
113 176
114 result.command = command 177 result.command = command
@@ -132,7 +195,7 @@ def runCmd(command, ignore_status=False, timeout=None, assert_error=True,
132 return result 195 return result
133 196
134 197
135def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **options): 198def bitbake(command, ignore_status=False, timeout=None, postconfig=None, output_log=None, **options):
136 199
137 if postconfig: 200 if postconfig:
138 postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf') 201 postconfig_file = os.path.join(os.environ.get('BUILDDIR'), 'oeqa-post.conf')
@@ -147,7 +210,7 @@ def bitbake(command, ignore_status=False, timeout=None, postconfig=None, **optio
147 cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]] 210 cmd = [ "bitbake" ] + [a for a in (command + extra_args.split(" ")) if a not in [""]]
148 211
149 try: 212 try:
150 return runCmd(cmd, ignore_status, timeout, **options) 213 return runCmd(cmd, ignore_status, timeout, output_log=output_log, **options)
151 finally: 214 finally:
152 if postconfig: 215 if postconfig:
153 os.remove(postconfig_file) 216 os.remove(postconfig_file)