diff options
| -rwxr-xr-x | bitbake/bin/bitbake-worker | 52 |
1 files changed, 36 insertions, 16 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index af66ff05c8..db3c4b184f 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
| @@ -12,7 +12,9 @@ import errno | |||
| 12 | import signal | 12 | import signal |
| 13 | import pickle | 13 | import pickle |
| 14 | import traceback | 14 | import traceback |
| 15 | import queue | ||
| 15 | from multiprocessing import Lock | 16 | from multiprocessing import Lock |
| 17 | from threading import Thread | ||
| 16 | 18 | ||
| 17 | if sys.getfilesystemencoding() != "utf-8": | 19 | if sys.getfilesystemencoding() != "utf-8": |
| 18 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") | 20 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") |
| @@ -64,7 +66,7 @@ if 0: | |||
| 64 | consolelog.setFormatter(conlogformat) | 66 | consolelog.setFormatter(conlogformat) |
| 65 | logger.addHandler(consolelog) | 67 | logger.addHandler(consolelog) |
| 66 | 68 | ||
| 67 | worker_queue = b"" | 69 | worker_queue = queue.Queue() |
| 68 | 70 | ||
| 69 | def worker_fire(event, d): | 71 | def worker_fire(event, d): |
| 70 | data = b"<event>" + pickle.dumps(event) + b"</event>" | 72 | data = b"<event>" + pickle.dumps(event) + b"</event>" |
| @@ -73,21 +75,38 @@ def worker_fire(event, d): | |||
| 73 | def worker_fire_prepickled(event): | 75 | def worker_fire_prepickled(event): |
| 74 | global worker_queue | 76 | global worker_queue |
| 75 | 77 | ||
| 76 | worker_queue = worker_queue + event | 78 | worker_queue.put(event) |
| 77 | worker_flush() | ||
| 78 | 79 | ||
| 79 | def worker_flush(): | 80 | # |
| 80 | global worker_queue, worker_pipe | 81 | # We can end up with write contention with the cooker, it can be trying to send commands |
| 82 | # and we can be trying to send event data back. Therefore use a separate thread for writing | ||
| 83 | # back data to cooker. | ||
| 84 | # | ||
| 85 | worker_thread_exit = False | ||
| 81 | 86 | ||
| 82 | if not worker_queue: | 87 | def worker_flush(worker_queue): |
| 83 | return | 88 | worker_queue_int = b"" |
| 89 | global worker_pipe, worker_thread_exit | ||
| 84 | 90 | ||
| 85 | try: | 91 | while True: |
| 86 | written = os.write(worker_pipe, worker_queue) | 92 | try: |
| 87 | worker_queue = worker_queue[written:] | 93 | worker_queue_int = worker_queue_int + worker_queue.get(True, 1) |
| 88 | except (IOError, OSError) as e: | 94 | except queue.Empty: |
| 89 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | 95 | pass |
| 90 | raise | 96 | while (worker_queue_int or not worker_queue.empty()): |
| 97 | try: | ||
| 98 | if not worker_queue.empty(): | ||
| 99 | worker_queue_int = worker_queue_int + worker_queue.get() | ||
| 100 | written = os.write(worker_pipe, worker_queue_int) | ||
| 101 | worker_queue_int = worker_queue_int[written:] | ||
| 102 | except (IOError, OSError) as e: | ||
| 103 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | ||
| 104 | raise | ||
| 105 | if worker_thread_exit and worker_queue.empty() and not worker_queue_int: | ||
| 106 | return | ||
| 107 | |||
| 108 | worker_thread = Thread(target=worker_flush, args=(worker_queue,)) | ||
| 109 | worker_thread.start() | ||
| 91 | 110 | ||
| 92 | def worker_child_fire(event, d): | 111 | def worker_child_fire(event, d): |
| 93 | global worker_pipe | 112 | global worker_pipe |
| @@ -353,7 +372,6 @@ class BitbakeWorker(object): | |||
| 353 | self.build_pipes[pipe].read() | 372 | self.build_pipes[pipe].read() |
| 354 | if len(self.build_pids): | 373 | if len(self.build_pids): |
| 355 | self.process_waitpid() | 374 | self.process_waitpid() |
| 356 | worker_flush() | ||
| 357 | 375 | ||
| 358 | 376 | ||
| 359 | def handle_item(self, item, func): | 377 | def handle_item(self, item, func): |
| @@ -458,8 +476,10 @@ except BaseException as e: | |||
| 458 | import traceback | 476 | import traceback |
| 459 | sys.stderr.write(traceback.format_exc()) | 477 | sys.stderr.write(traceback.format_exc()) |
| 460 | sys.stderr.write(str(e)) | 478 | sys.stderr.write(str(e)) |
| 461 | while len(worker_queue): | 479 | |
| 462 | worker_flush() | 480 | worker_thread_exit = True |
| 481 | worker_thread.join() | ||
| 482 | |||
| 463 | workerlog_write("exitting") | 483 | workerlog_write("exitting") |
| 464 | sys.exit(0) | 484 | sys.exit(0) |
| 465 | 485 | ||
