diff options
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 52 |
1 files changed, 36 insertions, 16 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index af66ff05c8..db3c4b184f 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
@@ -12,7 +12,9 @@ import errno | |||
12 | import signal | 12 | import signal |
13 | import pickle | 13 | import pickle |
14 | import traceback | 14 | import traceback |
15 | import queue | ||
15 | from multiprocessing import Lock | 16 | from multiprocessing import Lock |
17 | from threading import Thread | ||
16 | 18 | ||
17 | if sys.getfilesystemencoding() != "utf-8": | 19 | if sys.getfilesystemencoding() != "utf-8": |
18 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") | 20 | sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") |
@@ -64,7 +66,7 @@ if 0: | |||
64 | consolelog.setFormatter(conlogformat) | 66 | consolelog.setFormatter(conlogformat) |
65 | logger.addHandler(consolelog) | 67 | logger.addHandler(consolelog) |
66 | 68 | ||
67 | worker_queue = b"" | 69 | worker_queue = queue.Queue() |
68 | 70 | ||
69 | def worker_fire(event, d): | 71 | def worker_fire(event, d): |
70 | data = b"<event>" + pickle.dumps(event) + b"</event>" | 72 | data = b"<event>" + pickle.dumps(event) + b"</event>" |
@@ -73,21 +75,38 @@ def worker_fire(event, d): | |||
73 | def worker_fire_prepickled(event): | 75 | def worker_fire_prepickled(event): |
74 | global worker_queue | 76 | global worker_queue |
75 | 77 | ||
76 | worker_queue = worker_queue + event | 78 | worker_queue.put(event) |
77 | worker_flush() | ||
78 | 79 | ||
79 | def worker_flush(): | 80 | # |
80 | global worker_queue, worker_pipe | 81 | # We can end up with write contention with the cooker, it can be trying to send commands |
82 | # and we can be trying to send event data back. Therefore use a separate thread for writing | ||
83 | # back data to cooker. | ||
84 | # | ||
85 | worker_thread_exit = False | ||
81 | 86 | ||
82 | if not worker_queue: | 87 | def worker_flush(worker_queue): |
83 | return | 88 | worker_queue_int = b"" |
89 | global worker_pipe, worker_thread_exit | ||
84 | 90 | ||
85 | try: | 91 | while True: |
86 | written = os.write(worker_pipe, worker_queue) | 92 | try: |
87 | worker_queue = worker_queue[written:] | 93 | worker_queue_int = worker_queue_int + worker_queue.get(True, 1) |
88 | except (IOError, OSError) as e: | 94 | except queue.Empty: |
89 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | 95 | pass |
90 | raise | 96 | while (worker_queue_int or not worker_queue.empty()): |
97 | try: | ||
98 | if not worker_queue.empty(): | ||
99 | worker_queue_int = worker_queue_int + worker_queue.get() | ||
100 | written = os.write(worker_pipe, worker_queue_int) | ||
101 | worker_queue_int = worker_queue_int[written:] | ||
102 | except (IOError, OSError) as e: | ||
103 | if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: | ||
104 | raise | ||
105 | if worker_thread_exit and worker_queue.empty() and not worker_queue_int: | ||
106 | return | ||
107 | |||
108 | worker_thread = Thread(target=worker_flush, args=(worker_queue,)) | ||
109 | worker_thread.start() | ||
91 | 110 | ||
92 | def worker_child_fire(event, d): | 111 | def worker_child_fire(event, d): |
93 | global worker_pipe | 112 | global worker_pipe |
@@ -353,7 +372,6 @@ class BitbakeWorker(object): | |||
353 | self.build_pipes[pipe].read() | 372 | self.build_pipes[pipe].read() |
354 | if len(self.build_pids): | 373 | if len(self.build_pids): |
355 | self.process_waitpid() | 374 | self.process_waitpid() |
356 | worker_flush() | ||
357 | 375 | ||
358 | 376 | ||
359 | def handle_item(self, item, func): | 377 | def handle_item(self, item, func): |
@@ -458,8 +476,10 @@ except BaseException as e: | |||
458 | import traceback | 476 | import traceback |
459 | sys.stderr.write(traceback.format_exc()) | 477 | sys.stderr.write(traceback.format_exc()) |
460 | sys.stderr.write(str(e)) | 478 | sys.stderr.write(str(e)) |
461 | while len(worker_queue): | 479 | |
462 | worker_flush() | 480 | worker_thread_exit = True |
481 | worker_thread.join() | ||
482 | |||
463 | workerlog_write("exitting") | 483 | workerlog_write("exitting") |
464 | sys.exit(0) | 484 | sys.exit(0) |
465 | 485 | ||