diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2016-11-24 21:41:30 +0000 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2016-11-29 11:22:06 +0000 |
commit | 3d34ae6c11bc2e800f784740c54b2dd2029658ed (patch) | |
tree | b3c0b766e39dfbd5c5817410ba586e92f85252ae /bitbake | |
parent | 71b7c3a88ec5a20d5e4531751d0bf23d5c94e494 (diff) | |
download | poky-3d34ae6c11bc2e800f784740c54b2dd2029658ed.tar.gz |
bitbake: bitbake-worker: Handle cooker/worker IO deadlocking
I noiced builds where tasks seemed to be taking a surprisingly long time.
When I looked at the output of top/pstree, these tasks were no longer
running despite being listed in knotty. Some were in D/Z state waiting for
their exit code to be collected, others were simply not present at all.
strace showed communication problems between the worker and cooker, each
was trying to write to the other and nearly deadlocking. Eventually, timeouts
would allow them to echange 64kb of data but this was only happening every
few seconds.
Whilst this particularly affected builds on machines with large numbers
of cores (and hence highly parallal task execution) and in cases where
I had a lot of debug enabled, this situation is clearly bad in general.
This patch introduces a thread to the worker which is used to write data
back to cooker. This means that the deadlock can't occur and data flows
much more freely and effectively.
(Bitbake rev: 3cb0d1c78b4c2e4f251a59b86c8da583828ad08b)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 52 |
1 files changed, 36 insertions, 16 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index af66ff05c8..db3c4b184f 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
@@ -12,7 +12,9 @@ import errno | |||
12 | import signal | 12 | import signal |
13 | import pickle | 13 | import pickle |
14 | import traceback | 14 | import traceback |
15 | import queue | ||
15 | from multiprocessing import Lock | 16 | from multiprocessing import Lock |
17 | from threading import Thread | ||
16 | 18 | ||
17 | if sys.getfilesystemencoding() != "utf-8": | 19 | if sys.getfilesystemencoding() != "utf-8": |
18 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") | 20 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") |
@@ -64,7 +66,7 @@ if 0: | |||
64 | consolelog.setFormatter(conlogformat) | 66 | consolelog.setFormatter(conlogformat) |
65 | logger.addHandler(consolelog) | 67 | logger.addHandler(consolelog) |
66 | 68 | ||
67 | worker_queue = b"" | 69 | worker_queue = queue.Queue() |
68 | 70 | ||
69 | def worker_fire(event, d): | 71 | def worker_fire(event, d): |
70 | data = b"<event>" + pickle.dumps(event) + b"</event>" | 72 | data = b"<event>" + pickle.dumps(event) + b"</event>" |
@@ -73,21 +75,38 @@ def worker_fire(event, d): | |||
73 | def worker_fire_prepickled(event): | 75 | def worker_fire_prepickled(event): |
74 | global worker_queue | 76 | global worker_queue |
75 | 77 | ||
76 | worker_queue = worker_queue + event | 78 | worker_queue.put(event) |
77 | worker_flush() | ||
78 | 79 | ||
79 | def worker_flush(): | 80 | # |
80 | global worker_queue, worker_pipe | 81 | # We can end up with write contention with the cooker, it can be trying to send commands |
82 | # and we can be trying to send event data back. Therefore use a separate thread for writing | ||
83 | # back data to cooker. | ||
84 | # | ||
85 | worker_thread_exit = False | ||
81 | 86 | ||
82 | if not worker_queue: | 87 | def worker_flush(worker_queue): |
83 | return | 88 | worker_queue_int = b"" |
89 | global worker_pipe, worker_thread_exit | ||
84 | 90 | ||
85 | try: | 91 | while True: |
86 | written = os.write(worker_pipe, worker_queue) | 92 | try: |
87 | worker_queue = worker_queue[written:] | 93 | worker_queue_int = worker_queue_int + worker_queue.get(True, 1) |
88 | except (IOError, OSError) as e: | 94 | except queue.Empty: |
89 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | 95 | pass |
90 | raise | 96 | while (worker_queue_int or not worker_queue.empty()): |
97 | try: | ||
98 | if not worker_queue.empty(): | ||
99 | worker_queue_int = worker_queue_int + worker_queue.get() | ||
100 | written = os.write(worker_pipe, worker_queue_int) | ||
101 | worker_queue_int = worker_queue_int[written:] | ||
102 | except (IOError, OSError) as e: | ||
103 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | ||
104 | raise | ||
105 | if worker_thread_exit and worker_queue.empty() and not worker_queue_int: | ||
106 | return | ||
107 | |||
108 | worker_thread = Thread(target=worker_flush, args=(worker_queue,)) | ||
109 | worker_thread.start() | ||
91 | 110 | ||
92 | def worker_child_fire(event, d): | 111 | def worker_child_fire(event, d): |
93 | global worker_pipe | 112 | global worker_pipe |
@@ -353,7 +372,6 @@ class BitbakeWorker(object): | |||
353 | self.build_pipes[pipe].read() | 372 | self.build_pipes[pipe].read() |
354 | if len(self.build_pids): | 373 | if len(self.build_pids): |
355 | self.process_waitpid() | 374 | self.process_waitpid() |
356 | worker_flush() | ||
357 | 375 | ||
358 | 376 | ||
359 | def handle_item(self, item, func): | 377 | def handle_item(self, item, func): |
@@ -458,8 +476,10 @@ except BaseException as e: | |||
458 | import traceback | 476 | import traceback |
459 | sys.stderr.write(traceback.format_exc()) | 477 | sys.stderr.write(traceback.format_exc()) |
460 | sys.stderr.write(str(e)) | 478 | sys.stderr.write(str(e)) |
461 | while len(worker_queue): | 479 | |
462 | worker_flush() | 480 | worker_thread_exit = True |
481 | worker_thread.join() | ||
482 | |||
463 | workerlog_write("exitting") | 483 | workerlog_write("exitting") |
464 | sys.exit(0) | 484 | sys.exit(0) |
465 | 485 | ||