summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbitbake/bin/bitbake-worker52
1 files changed, 36 insertions, 16 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index af66ff05c8..db3c4b184f 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -12,7 +12,9 @@ import errno
12import signal 12import signal
13import pickle 13import pickle
14import traceback 14import traceback
15import queue
15from multiprocessing import Lock 16from multiprocessing import Lock
17from threading import Thread
16 18
17if sys.getfilesystemencoding() != "utf-8": 19if sys.getfilesystemencoding() != "utf-8":
18 sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.") 20 sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
@@ -64,7 +66,7 @@ if 0:
64 consolelog.setFormatter(conlogformat) 66 consolelog.setFormatter(conlogformat)
65 logger.addHandler(consolelog) 67 logger.addHandler(consolelog)
66 68
67worker_queue = b"" 69worker_queue = queue.Queue()
68 70
69def worker_fire(event, d): 71def worker_fire(event, d):
70 data = b"<event>" + pickle.dumps(event) + b"</event>" 72 data = b"<event>" + pickle.dumps(event) + b"</event>"
@@ -73,21 +75,38 @@ def worker_fire(event, d):
73def worker_fire_prepickled(event): 75def worker_fire_prepickled(event):
74 global worker_queue 76 global worker_queue
75 77
76 worker_queue = worker_queue + event 78 worker_queue.put(event)
77 worker_flush()
78 79
79def worker_flush(): 80#
80 global worker_queue, worker_pipe 81# We can end up with write contention with the cooker, it can be trying to send commands
82# and we can be trying to send event data back. Therefore use a separate thread for writing
83# back data to cooker.
84#
85worker_thread_exit = False
81 86
82 if not worker_queue: 87def worker_flush(worker_queue):
83 return 88 worker_queue_int = b""
89 global worker_pipe, worker_thread_exit
84 90
85 try: 91 while True:
86 written = os.write(worker_pipe, worker_queue) 92 try:
87 worker_queue = worker_queue[written:] 93 worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
88 except (IOError, OSError) as e: 94 except queue.Empty:
89 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 95 pass
90 raise 96 while (worker_queue_int or not worker_queue.empty()):
97 try:
98 if not worker_queue.empty():
99 worker_queue_int = worker_queue_int + worker_queue.get()
100 written = os.write(worker_pipe, worker_queue_int)
101 worker_queue_int = worker_queue_int[written:]
102 except (IOError, OSError) as e:
103 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
104 raise
105 if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
106 return
107
108worker_thread = Thread(target=worker_flush, args=(worker_queue,))
109worker_thread.start()
91 110
92def worker_child_fire(event, d): 111def worker_child_fire(event, d):
93 global worker_pipe 112 global worker_pipe
@@ -353,7 +372,6 @@ class BitbakeWorker(object):
353 self.build_pipes[pipe].read() 372 self.build_pipes[pipe].read()
354 if len(self.build_pids): 373 if len(self.build_pids):
355 self.process_waitpid() 374 self.process_waitpid()
356 worker_flush()
357 375
358 376
359 def handle_item(self, item, func): 377 def handle_item(self, item, func):
@@ -458,8 +476,10 @@ except BaseException as e:
458 import traceback 476 import traceback
459 sys.stderr.write(traceback.format_exc()) 477 sys.stderr.write(traceback.format_exc())
460 sys.stderr.write(str(e)) 478 sys.stderr.write(str(e))
461while len(worker_queue): 479
462 worker_flush() 480worker_thread_exit = True
481worker_thread.join()
482
463workerlog_write("exitting") 483workerlog_write("exitting")
464sys.exit(0) 484sys.exit(0)
465 485