summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py402
1 files changed, 273 insertions, 129 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index b27b4aefe0..4b35be62cd 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -26,6 +26,9 @@ import errno
26import re 26import re
27import datetime 27import datetime
28import pickle 28import pickle
29import traceback
30import gc
31import stat
29import bb.server.xmlrpcserver 32import bb.server.xmlrpcserver
30from bb import daemonize 33from bb import daemonize
31from multiprocessing import queues 34from multiprocessing import queues
@@ -35,9 +38,46 @@ logger = logging.getLogger('BitBake')
35class ProcessTimeout(SystemExit): 38class ProcessTimeout(SystemExit):
36 pass 39 pass
37 40
41def currenttime():
42 return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
38def serverlog(msg): 44def serverlog(msg):
39 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 45 print(str(os.getpid()) + " " + currenttime() + " " + msg)
40 sys.stdout.flush() 46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47 #sys.stdout.flush()
48
49#
50# When we have lockfile issues, try and find infomation about which process is
51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54 # Some systems may not have lsof available
55 procs = None
56 try:
57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58 except subprocess.CalledProcessError:
59 # File was deleted?
60 pass
61 except OSError as e:
62 if e.errno != errno.ENOENT:
63 raise
64 if procs is None:
65 # Fall back to fuser if lsof is unavailable
66 try:
67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68 except subprocess.CalledProcessError:
69 # File was deleted?
70 pass
71 except OSError as e:
72 if e.errno != errno.ENOENT:
73 raise
74 if procs:
75 return procs.decode("utf-8")
76 return None
77
78class idleFinish():
79 def __init__(self, msg):
80 self.msg = msg
41 81
42class ProcessServer(): 82class ProcessServer():
43 profile_filename = "profile.log" 83 profile_filename = "profile.log"
@@ -56,12 +96,19 @@ class ProcessServer():
56 self.maxuiwait = 30 96 self.maxuiwait = 30
57 self.xmlrpc = False 97 self.xmlrpc = False
58 98
99 self.idle = None
100 # Need a lock for _idlefuns changes
59 self._idlefuns = {} 101 self._idlefuns = {}
102 self._idlefuncsLock = threading.Lock()
103 self.idle_cond = threading.Condition(self._idlefuncsLock)
60 104
61 self.bitbake_lock = lock 105 self.bitbake_lock = lock
62 self.bitbake_lock_name = lockname 106 self.bitbake_lock_name = lockname
63 self.sock = sock 107 self.sock = sock
64 self.sockname = sockname 108 self.sockname = sockname
109 # It is possible the directory may be renamed. Cache the inode of the socket file
110 # so we can tell if things changed.
111 self.sockinode = os.stat(self.sockname)[stat.ST_INO]
65 112
66 self.server_timeout = server_timeout 113 self.server_timeout = server_timeout
67 self.timeout = self.server_timeout 114 self.timeout = self.server_timeout
@@ -70,7 +117,9 @@ class ProcessServer():
70 def register_idle_function(self, function, data): 117 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle""" 118 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__') 119 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data 120 with bb.utils.lock_timeout(self._idlefuncsLock):
121 self._idlefuns[function] = data
122 serverlog("Registering idle function %s" % str(function))
74 123
75 def run(self): 124 def run(self):
76 125
@@ -109,6 +158,31 @@ class ProcessServer():
109 158
110 return ret 159 return ret
111 160
161 def _idle_check(self):
162 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
163
164 def wait_for_idle(self, timeout=30):
165 # Wait for the idle loop to have cleared
166 with bb.utils.lock_timeout(self._idlefuncsLock):
167 return self.idle_cond.wait_for(self._idle_check, timeout) is not False
168
169 def set_async_cmd(self, cmd):
170 with bb.utils.lock_timeout(self._idlefuncsLock):
171 ret = self.idle_cond.wait_for(self._idle_check, 30)
172 if ret is False:
173 return False
174 self.cooker.command.currentAsyncCommand = cmd
175 return True
176
177 def clear_async_cmd(self):
178 with bb.utils.lock_timeout(self._idlefuncsLock):
179 self.cooker.command.currentAsyncCommand = None
180 self.idle_cond.notify_all()
181
182 def get_async_cmd(self):
183 with bb.utils.lock_timeout(self._idlefuncsLock):
184 return self.cooker.command.currentAsyncCommand
185
112 def main(self): 186 def main(self):
113 self.cooker.pre_serve() 187 self.cooker.pre_serve()
114 188
@@ -123,14 +197,19 @@ class ProcessServer():
123 fds.append(self.xmlrpc) 197 fds.append(self.xmlrpc)
124 seendata = False 198 seendata = False
125 serverlog("Entering server connection loop") 199 serverlog("Entering server connection loop")
200 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
126 201
127 def disconnect_client(self, fds): 202 def disconnect_client(self, fds):
128 serverlog("Disconnecting Client") 203 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
129 if self.controllersock: 204 if self.controllersock:
130 fds.remove(self.controllersock) 205 fds.remove(self.controllersock)
131 self.controllersock.close() 206 self.controllersock.close()
132 self.controllersock = False 207 self.controllersock = False
133 if self.haveui: 208 if self.haveui:
209 # Wait for the idle loop to have cleared (30s max)
210 if not self.wait_for_idle(30):
211 serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
212 self.quit = True
134 fds.remove(self.command_channel) 213 fds.remove(self.command_channel)
135 bb.event.unregister_UIHhandler(self.event_handle, True) 214 bb.event.unregister_UIHhandler(self.event_handle, True)
136 self.command_channel_reply.writer.close() 215 self.command_channel_reply.writer.close()
@@ -142,12 +221,12 @@ class ProcessServer():
142 self.cooker.clientComplete() 221 self.cooker.clientComplete()
143 self.haveui = False 222 self.haveui = False
144 ready = select.select(fds,[],[],0)[0] 223 ready = select.select(fds,[],[],0)[0]
145 if newconnections: 224 if newconnections and not self.quit:
146 serverlog("Starting new client") 225 serverlog("Starting new client")
147 conn = newconnections.pop(-1) 226 conn = newconnections.pop(-1)
148 fds.append(conn) 227 fds.append(conn)
149 self.controllersock = conn 228 self.controllersock = conn
150 elif self.timeout is None and not ready: 229 elif not self.timeout and not ready:
151 serverlog("No timeout, exiting.") 230 serverlog("No timeout, exiting.")
152 self.quit = True 231 self.quit = True
153 232
@@ -214,11 +293,14 @@ class ProcessServer():
214 continue 293 continue
215 try: 294 try:
216 serverlog("Running command %s" % command) 295 serverlog("Running command %s" % command)
217 self.command_channel_reply.send(self.cooker.command.runCommand(command)) 296 reply = self.cooker.command.runCommand(command, self)
218 serverlog("Command Completed") 297 serverlog("Sending reply %s" % repr(reply))
298 self.command_channel_reply.send(reply)
299 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
219 except Exception as e: 300 except Exception as e:
220 serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) 301 stack = traceback.format_exc()
221 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) 302 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
303 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
222 304
223 if self.xmlrpc in ready: 305 if self.xmlrpc in ready:
224 self.xmlrpc.handle_requests() 306 self.xmlrpc.handle_requests()
@@ -239,21 +321,42 @@ class ProcessServer():
239 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 321 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
240 seendata = True 322 seendata = True
241 323
242 ready = self.idle_commands(.1, fds) 324 if not self.idle:
325 self.idle = threading.Thread(target=self.idle_thread)
326 self.idle.start()
327 elif self.idle and not self.idle.is_alive():
328 serverlog("Idle thread terminated, main thread exiting too")
329 bb.error("Idle thread terminated, main thread exiting too")
330 self.quit = True
243 331
244 if len(threading.enumerate()) != 1: 332 nextsleep = 1.0
245 serverlog("More than one thread left?: " + str(threading.enumerate())) 333 if self.xmlrpc:
334 nextsleep = self.xmlrpc.get_timeout(nextsleep)
335 try:
336 ready = select.select(fds,[],[],nextsleep)[0]
337 except InterruptedError:
338 # Ignore EINTR
339 ready = []
340
341 if self.idle:
342 self.idle.join()
246 343
247 serverlog("Exiting") 344 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
248 # Remove the socket file so we don't get any more connections to avoid races 345 # Remove the socket file so we don't get any more connections to avoid races
346 # The build directory could have been renamed so if the file isn't the one we created
347 # we shouldn't delete it.
249 try: 348 try:
250 os.unlink(self.sockname) 349 sockinode = os.stat(self.sockname)[stat.ST_INO]
251 except: 350 if sockinode == self.sockinode:
252 pass 351 os.unlink(self.sockname)
352 else:
353 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
354 except Exception as err:
355 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
253 self.sock.close() 356 self.sock.close()
254 357
255 try: 358 try:
256 self.cooker.shutdown(True) 359 self.cooker.shutdown(True, idle=False)
257 self.cooker.notifier.stop() 360 self.cooker.notifier.stop()
258 self.cooker.confignotifier.stop() 361 self.cooker.confignotifier.stop()
259 except: 362 except:
@@ -261,6 +364,9 @@ class ProcessServer():
261 364
262 self.cooker.post_serve() 365 self.cooker.post_serve()
263 366
367 if len(threading.enumerate()) != 1:
368 serverlog("More than one thread left?: " + str(threading.enumerate()))
369
264 # Flush logs before we release the lock 370 # Flush logs before we release the lock
265 sys.stdout.flush() 371 sys.stdout.flush()
266 sys.stderr.flush() 372 sys.stderr.flush()
@@ -276,20 +382,21 @@ class ProcessServer():
276 except FileNotFoundError: 382 except FileNotFoundError:
277 return None 383 return None
278 384
279 lockcontents = get_lock_contents(lockfile)
280 serverlog("Original lockfile contents: " + str(lockcontents))
281
282 lock.close() 385 lock.close()
283 lock = None 386 lock = None
284 387
285 while not lock: 388 while not lock:
286 i = 0 389 i = 0
287 lock = None 390 lock = None
391 if not os.path.exists(os.path.basename(lockfile)):
392 serverlog("Lockfile directory gone, exiting.")
393 return
394
288 while not lock and i < 30: 395 while not lock and i < 30:
289 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 396 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
290 if not lock: 397 if not lock:
291 newlockcontents = get_lock_contents(lockfile) 398 newlockcontents = get_lock_contents(lockfile)
292 if newlockcontents != lockcontents: 399 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
293 # A new server was started, the lockfile contents changed, we can exit 400 # A new server was started, the lockfile contents changed, we can exit
294 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 401 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
295 return 402 return
@@ -303,87 +410,95 @@ class ProcessServer():
303 return 410 return
304 411
305 if not lock: 412 if not lock:
306 # Some systems may not have lsof available 413 procs = get_lockfile_process_msg(lockfile)
307 procs = None 414 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
308 try:
309 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
310 except subprocess.CalledProcessError:
311 # File was deleted?
312 continue
313 except OSError as e:
314 if e.errno != errno.ENOENT:
315 raise
316 if procs is None:
317 # Fall back to fuser if lsof is unavailable
318 try:
319 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
320 except subprocess.CalledProcessError:
321 # File was deleted?
322 continue
323 except OSError as e:
324 if e.errno != errno.ENOENT:
325 raise
326
327 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
328 if procs: 415 if procs:
329 msg += ":\n%s" % str(procs.decode("utf-8")) 416 msg.append(":\n%s" % procs)
330 serverlog(msg) 417 serverlog("".join(msg))
331 418
332 def idle_commands(self, delay, fds=None): 419 def idle_thread(self):
333 nextsleep = delay 420 if self.cooker.configuration.profile:
334 if not fds:
335 fds = []
336
337 for function, data in list(self._idlefuns.items()):
338 try: 421 try:
339 retval = function(self, data, False) 422 import cProfile as profile
340 if retval is False: 423 except:
341 del self._idlefuns[function] 424 import profile
342 nextsleep = None 425 prof = profile.Profile()
343 elif retval is True:
344 nextsleep = None
345 elif isinstance(retval, float) and nextsleep:
346 if (retval < nextsleep):
347 nextsleep = retval
348 elif nextsleep is None:
349 continue
350 else:
351 fds = fds + retval
352 except SystemExit:
353 raise
354 except Exception as exc:
355 if not isinstance(exc, bb.BBHandledException):
356 logger.exception('Running idle function')
357 del self._idlefuns[function]
358 self.quit = True
359 426
360 # Create new heartbeat event? 427 ret = profile.Profile.runcall(prof, self.idle_thread_internal)
361 now = time.time() 428
362 if now >= self.next_heartbeat: 429 prof.dump_stats("profile-mainloop.log")
363 # We might have missed heartbeats. Just trigger once in 430 bb.utils.process_profilelog("profile-mainloop.log")
364 # that case and continue after the usual delay. 431 serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
365 self.next_heartbeat += self.heartbeat_seconds
366 if self.next_heartbeat <= now:
367 self.next_heartbeat = now + self.heartbeat_seconds
368 if hasattr(self.cooker, "data"):
369 heartbeat = bb.event.HeartbeatEvent(now)
370 bb.event.fire(heartbeat, self.cooker.data)
371 if nextsleep and now + nextsleep > self.next_heartbeat:
372 # Shorten timeout so that we we wake up in time for
373 # the heartbeat.
374 nextsleep = self.next_heartbeat - now
375
376 if nextsleep is not None:
377 if self.xmlrpc:
378 nextsleep = self.xmlrpc.get_timeout(nextsleep)
379 try:
380 return select.select(fds,[],[],nextsleep)[0]
381 except InterruptedError:
382 # Ignore EINTR
383 return []
384 else: 432 else:
385 return select.select(fds,[],[],0)[0] 433 self.idle_thread_internal()
434
435 def idle_thread_internal(self):
436 def remove_idle_func(function):
437 with bb.utils.lock_timeout(self._idlefuncsLock):
438 del self._idlefuns[function]
439 self.idle_cond.notify_all()
440
441 while not self.quit:
442 nextsleep = 1.0
443 fds = []
444
445 with bb.utils.lock_timeout(self._idlefuncsLock):
446 items = list(self._idlefuns.items())
386 447
448 for function, data in items:
449 try:
450 retval = function(self, data, False)
451 if isinstance(retval, idleFinish):
452 serverlog("Removing idle function %s at idleFinish" % str(function))
453 remove_idle_func(function)
454 self.cooker.command.finishAsyncCommand(retval.msg)
455 nextsleep = None
456 elif retval is False:
457 serverlog("Removing idle function %s" % str(function))
458 remove_idle_func(function)
459 nextsleep = None
460 elif retval is True:
461 nextsleep = None
462 elif isinstance(retval, float) and nextsleep:
463 if (retval < nextsleep):
464 nextsleep = retval
465 elif nextsleep is None:
466 continue
467 else:
468 fds = fds + retval
469 except SystemExit:
470 raise
471 except Exception as exc:
472 if not isinstance(exc, bb.BBHandledException):
473 logger.exception('Running idle function')
474 remove_idle_func(function)
475 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
476 self.quit = True
477
478 # Create new heartbeat event?
479 now = time.time()
480 if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat:
481 # We might have missed heartbeats. Just trigger once in
482 # that case and continue after the usual delay.
483 self.next_heartbeat += self.heartbeat_seconds
484 if self.next_heartbeat <= now:
485 self.next_heartbeat = now + self.heartbeat_seconds
486 if hasattr(self.cooker, "data"):
487 heartbeat = bb.event.HeartbeatEvent(now)
488 try:
489 bb.event.fire(heartbeat, self.cooker.data)
490 except Exception as exc:
491 if not isinstance(exc, bb.BBHandledException):
492 logger.exception('Running heartbeat function')
493 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
494 self.quit = True
495 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
496 # Shorten timeout so that we we wake up in time for
497 # the heartbeat.
498 nextsleep = self.next_heartbeat - now
499
500 if nextsleep is not None:
501 select.select(fds,[],[],nextsleep)[0]
387 502
388class ServerCommunicator(): 503class ServerCommunicator():
389 def __init__(self, connection, recv): 504 def __init__(self, connection, recv):
@@ -391,12 +506,18 @@ class ServerCommunicator():
391 self.recv = recv 506 self.recv = recv
392 507
393 def runCommand(self, command): 508 def runCommand(self, command):
394 self.connection.send(command) 509 try:
510 self.connection.send(command)
511 except BrokenPipeError as e:
512 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
395 if not self.recv.poll(30): 513 if not self.recv.poll(30):
396 logger.info("No reply from server in 30s") 514 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
397 if not self.recv.poll(30): 515 if not self.recv.poll(30):
398 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 516 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
399 ret, exc = self.recv.get() 517 try:
518 ret, exc = self.recv.get()
519 except EOFError as e:
520 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
400 # Should probably turn all exceptions in exc back into exceptions? 521 # Should probably turn all exceptions in exc back into exceptions?
401 # For now, at least handle BBHandledException 522 # For now, at least handle BBHandledException
402 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 523 if exc and ("BBHandledException" in exc or "SystemExit" in exc):
@@ -429,6 +550,7 @@ class BitBakeProcessServerConnection(object):
429 self.socket_connection = sock 550 self.socket_connection = sock
430 551
431 def terminate(self): 552 def terminate(self):
553 self.events.close()
432 self.socket_connection.close() 554 self.socket_connection.close()
433 self.connection.connection.close() 555 self.connection.connection.close()
434 self.connection.recv.close() 556 self.connection.recv.close()
@@ -439,13 +561,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
439 561
440class BitBakeServer(object): 562class BitBakeServer(object):
441 563
442 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 564 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
443 565
444 self.server_timeout = server_timeout 566 self.server_timeout = server_timeout
445 self.xmlrpcinterface = xmlrpcinterface 567 self.xmlrpcinterface = xmlrpcinterface
446 self.featureset = featureset 568 self.featureset = featureset
447 self.sockname = sockname 569 self.sockname = sockname
448 self.bitbake_lock = lock 570 self.bitbake_lock = lock
571 self.profile = profile
449 self.readypipe, self.readypipein = os.pipe() 572 self.readypipe, self.readypipein = os.pipe()
450 573
451 # Place the log in the builddirectory alongside the lock file 574 # Place the log in the builddirectory alongside the lock file
@@ -466,7 +589,7 @@ class BitBakeServer(object):
466 try: 589 try:
467 r = ready.get() 590 r = ready.get()
468 except EOFError: 591 except EOFError:
469 # Trap the child exitting/closing the pipe and error out 592 # Trap the child exiting/closing the pipe and error out
470 r = None 593 r = None
471 if not r or r[0] != "r": 594 if not r or r[0] != "r":
472 ready.close() 595 ready.close()
@@ -509,9 +632,9 @@ class BitBakeServer(object):
509 os.set_inheritable(self.bitbake_lock.fileno(), True) 632 os.set_inheritable(self.bitbake_lock.fileno(), True)
510 os.set_inheritable(self.readypipein, True) 633 os.set_inheritable(self.readypipein, True)
511 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 634 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
512 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 635 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
513 636
514def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 637def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
515 638
516 import bb.cookerdata 639 import bb.cookerdata
517 import bb.cooker 640 import bb.cooker
@@ -523,6 +646,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
523 646
524 # Create server control socket 647 # Create server control socket
525 if os.path.exists(sockname): 648 if os.path.exists(sockname):
649 serverlog("WARNING: removing existing socket file '%s'" % sockname)
526 os.unlink(sockname) 650 os.unlink(sockname)
527 651
528 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 652 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -539,7 +663,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
539 writer = ConnectionWriter(readypipeinfd) 663 writer = ConnectionWriter(readypipeinfd)
540 try: 664 try:
541 featureset = [] 665 featureset = []
542 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 666 cooker = bb.cooker.BBCooker(featureset, server)
667 cooker.configuration.profile = profile
543 except bb.BBHandledException: 668 except bb.BBHandledException:
544 return None 669 return None
545 writer.send("r") 670 writer.send("r")
@@ -549,7 +674,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
549 674
550 server.run() 675 server.run()
551 finally: 676 finally:
552 # Flush any ,essages/errors to the logfile before exit 677 # Flush any messages/errors to the logfile before exit
553 sys.stdout.flush() 678 sys.stdout.flush()
554 sys.stderr.flush() 679 sys.stderr.flush()
555 680
@@ -654,23 +779,18 @@ class BBUIEventQueue:
654 self.reader = ConnectionReader(readfd) 779 self.reader = ConnectionReader(readfd)
655 780
656 self.t = threading.Thread() 781 self.t = threading.Thread()
657 self.t.setDaemon(True)
658 self.t.run = self.startCallbackHandler 782 self.t.run = self.startCallbackHandler
659 self.t.start() 783 self.t.start()
660 784
661 def getEvent(self): 785 def getEvent(self):
662 self.eventQueueLock.acquire() 786 with bb.utils.lock_timeout(self.eventQueueLock):
663 787 if len(self.eventQueue) == 0:
664 if len(self.eventQueue) == 0: 788 return None
665 self.eventQueueLock.release()
666 return None
667
668 item = self.eventQueue.pop(0)
669 789
670 if len(self.eventQueue) == 0: 790 item = self.eventQueue.pop(0)
671 self.eventQueueNotify.clear() 791 if len(self.eventQueue) == 0:
792 self.eventQueueNotify.clear()
672 793
673 self.eventQueueLock.release()
674 return item 794 return item
675 795
676 def waitEvent(self, delay): 796 def waitEvent(self, delay):
@@ -678,10 +798,9 @@ class BBUIEventQueue:
678 return self.getEvent() 798 return self.getEvent()
679 799
680 def queue_event(self, event): 800 def queue_event(self, event):
681 self.eventQueueLock.acquire() 801 with bb.utils.lock_timeout(self.eventQueueLock):
682 self.eventQueue.append(event) 802 self.eventQueue.append(event)
683 self.eventQueueNotify.set() 803 self.eventQueueNotify.set()
684 self.eventQueueLock.release()
685 804
686 def send_event(self, event): 805 def send_event(self, event):
687 self.queue_event(pickle.loads(event)) 806 self.queue_event(pickle.loads(event))
@@ -690,13 +809,17 @@ class BBUIEventQueue:
690 bb.utils.set_process_name("UIEventQueue") 809 bb.utils.set_process_name("UIEventQueue")
691 while True: 810 while True:
692 try: 811 try:
693 self.reader.wait() 812 ready = self.reader.wait(0.25)
694 event = self.reader.get() 813 if ready:
695 self.queue_event(event) 814 event = self.reader.get()
696 except EOFError: 815 self.queue_event(event)
816 except (EOFError, OSError, TypeError):
697 # Easiest way to exit is to close the file descriptor to cause an exit 817 # Easiest way to exit is to close the file descriptor to cause an exit
698 break 818 break
819
820 def close(self):
699 self.reader.close() 821 self.reader.close()
822 self.t.join()
700 823
701class ConnectionReader(object): 824class ConnectionReader(object):
702 825
@@ -711,7 +834,7 @@ class ConnectionReader(object):
711 return self.reader.poll(timeout) 834 return self.reader.poll(timeout)
712 835
713 def get(self): 836 def get(self):
714 with self.rlock: 837 with bb.utils.lock_timeout(self.rlock):
715 res = self.reader.recv_bytes() 838 res = self.reader.recv_bytes()
716 return multiprocessing.reduction.ForkingPickler.loads(res) 839 return multiprocessing.reduction.ForkingPickler.loads(res)
717 840
@@ -730,10 +853,31 @@ class ConnectionWriter(object):
730 # Why bb.event needs this I have no idea 853 # Why bb.event needs this I have no idea
731 self.event = self 854 self.event = self
732 855
856 def _send(self, obj):
857 gc.disable()
858 with bb.utils.lock_timeout(self.wlock):
859 self.writer.send_bytes(obj)
860 gc.enable()
861
733 def send(self, obj): 862 def send(self, obj):
734 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 863 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
735 with self.wlock: 864 # See notes/code in CookerParser
736 self.writer.send_bytes(obj) 865 # We must not terminate holding this lock else processes will hang.
866 # For SIGTERM, raising afterwards avoids this.
867 # For SIGINT, we don't want to have written partial data to the pipe.
868 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
869 process = multiprocessing.current_process()
870 if process and hasattr(process, "queue_signals"):
871 with bb.utils.lock_timeout(process.signal_threadlock):
872 process.queue_signals = True
873 self._send(obj)
874 process.queue_signals = False
875
876 while len(process.signal_received) > 0:
877 sig = process.signal_received.pop()
878 process.handle_sig(sig, None)
879 else:
880 self._send(obj)
737 881
738 def fileno(self): 882 def fileno(self):
739 return self.writer.fileno() 883 return self.writer.fileno()