summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py380
1 files changed, 267 insertions, 113 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index b27b4aefe0..76b189291d 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -26,6 +26,9 @@ import errno
26import re 26import re
27import datetime 27import datetime
28import pickle 28import pickle
29import traceback
30import gc
31import stat
29import bb.server.xmlrpcserver 32import bb.server.xmlrpcserver
30from bb import daemonize 33from bb import daemonize
31from multiprocessing import queues 34from multiprocessing import queues
@@ -35,9 +38,46 @@ logger = logging.getLogger('BitBake')
35class ProcessTimeout(SystemExit): 38class ProcessTimeout(SystemExit):
36 pass 39 pass
37 40
41def currenttime():
42 return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
38def serverlog(msg): 44def serverlog(msg):
39 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 45 print(str(os.getpid()) + " " + currenttime() + " " + msg)
40 sys.stdout.flush() 46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47 #sys.stdout.flush()
48
49#
50# When we have lockfile issues, try and find infomation about which process is
51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54 # Some systems may not have lsof available
55 procs = None
56 try:
57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58 except subprocess.CalledProcessError:
59 # File was deleted?
60 pass
61 except OSError as e:
62 if e.errno != errno.ENOENT:
63 raise
64 if procs is None:
65 # Fall back to fuser if lsof is unavailable
66 try:
67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68 except subprocess.CalledProcessError:
69 # File was deleted?
70 pass
71 except OSError as e:
72 if e.errno != errno.ENOENT:
73 raise
74 if procs:
75 return procs.decode("utf-8")
76 return None
77
78class idleFinish():
79 def __init__(self, msg):
80 self.msg = msg
41 81
42class ProcessServer(): 82class ProcessServer():
43 profile_filename = "profile.log" 83 profile_filename = "profile.log"
@@ -56,12 +96,19 @@ class ProcessServer():
56 self.maxuiwait = 30 96 self.maxuiwait = 30
57 self.xmlrpc = False 97 self.xmlrpc = False
58 98
99 self.idle = None
100 # Need a lock for _idlefuns changes
59 self._idlefuns = {} 101 self._idlefuns = {}
102 self._idlefuncsLock = threading.Lock()
103 self.idle_cond = threading.Condition(self._idlefuncsLock)
60 104
61 self.bitbake_lock = lock 105 self.bitbake_lock = lock
62 self.bitbake_lock_name = lockname 106 self.bitbake_lock_name = lockname
63 self.sock = sock 107 self.sock = sock
64 self.sockname = sockname 108 self.sockname = sockname
109 # It is possible the directory may be renamed. Cache the inode of the socket file
110 # so we can tell if things changed.
111 self.sockinode = os.stat(self.sockname)[stat.ST_INO]
65 112
66 self.server_timeout = server_timeout 113 self.server_timeout = server_timeout
67 self.timeout = self.server_timeout 114 self.timeout = self.server_timeout
@@ -70,7 +117,9 @@ class ProcessServer():
70 def register_idle_function(self, function, data): 117 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle""" 118 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__') 119 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data 120 with bb.utils.lock_timeout(self._idlefuncsLock):
121 self._idlefuns[function] = data
122 serverlog("Registering idle function %s" % str(function))
74 123
75 def run(self): 124 def run(self):
76 125
@@ -109,6 +158,31 @@ class ProcessServer():
109 158
110 return ret 159 return ret
111 160
161 def _idle_check(self):
162 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
163
164 def wait_for_idle(self, timeout=30):
165 # Wait for the idle loop to have cleared
166 with bb.utils.lock_timeout(self._idlefuncsLock):
167 return self.idle_cond.wait_for(self._idle_check, timeout) is not False
168
169 def set_async_cmd(self, cmd):
170 with bb.utils.lock_timeout(self._idlefuncsLock):
171 ret = self.idle_cond.wait_for(self._idle_check, 30)
172 if ret is False:
173 return False
174 self.cooker.command.currentAsyncCommand = cmd
175 return True
176
177 def clear_async_cmd(self):
178 with bb.utils.lock_timeout(self._idlefuncsLock):
179 self.cooker.command.currentAsyncCommand = None
180 self.idle_cond.notify_all()
181
182 def get_async_cmd(self):
183 with bb.utils.lock_timeout(self._idlefuncsLock):
184 return self.cooker.command.currentAsyncCommand
185
112 def main(self): 186 def main(self):
113 self.cooker.pre_serve() 187 self.cooker.pre_serve()
114 188
@@ -123,14 +197,19 @@ class ProcessServer():
123 fds.append(self.xmlrpc) 197 fds.append(self.xmlrpc)
124 seendata = False 198 seendata = False
125 serverlog("Entering server connection loop") 199 serverlog("Entering server connection loop")
200 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
126 201
127 def disconnect_client(self, fds): 202 def disconnect_client(self, fds):
128 serverlog("Disconnecting Client") 203 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
129 if self.controllersock: 204 if self.controllersock:
130 fds.remove(self.controllersock) 205 fds.remove(self.controllersock)
131 self.controllersock.close() 206 self.controllersock.close()
132 self.controllersock = False 207 self.controllersock = False
133 if self.haveui: 208 if self.haveui:
209 # Wait for the idle loop to have cleared (30s max)
210 if not self.wait_for_idle(30):
211 serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
212 self.quit = True
134 fds.remove(self.command_channel) 213 fds.remove(self.command_channel)
135 bb.event.unregister_UIHhandler(self.event_handle, True) 214 bb.event.unregister_UIHhandler(self.event_handle, True)
136 self.command_channel_reply.writer.close() 215 self.command_channel_reply.writer.close()
@@ -142,12 +221,12 @@ class ProcessServer():
142 self.cooker.clientComplete() 221 self.cooker.clientComplete()
143 self.haveui = False 222 self.haveui = False
144 ready = select.select(fds,[],[],0)[0] 223 ready = select.select(fds,[],[],0)[0]
145 if newconnections: 224 if newconnections and not self.quit:
146 serverlog("Starting new client") 225 serverlog("Starting new client")
147 conn = newconnections.pop(-1) 226 conn = newconnections.pop(-1)
148 fds.append(conn) 227 fds.append(conn)
149 self.controllersock = conn 228 self.controllersock = conn
150 elif self.timeout is None and not ready: 229 elif not self.timeout and not ready:
151 serverlog("No timeout, exiting.") 230 serverlog("No timeout, exiting.")
152 self.quit = True 231 self.quit = True
153 232
@@ -214,11 +293,14 @@ class ProcessServer():
214 continue 293 continue
215 try: 294 try:
216 serverlog("Running command %s" % command) 295 serverlog("Running command %s" % command)
217 self.command_channel_reply.send(self.cooker.command.runCommand(command)) 296 reply = self.cooker.command.runCommand(command, self)
218 serverlog("Command Completed") 297 serverlog("Sending reply %s" % repr(reply))
298 self.command_channel_reply.send(reply)
299 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
219 except Exception as e: 300 except Exception as e:
220 serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) 301 stack = traceback.format_exc()
221 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) 302 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
303 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
222 304
223 if self.xmlrpc in ready: 305 if self.xmlrpc in ready:
224 self.xmlrpc.handle_requests() 306 self.xmlrpc.handle_requests()
@@ -241,19 +323,25 @@ class ProcessServer():
241 323
242 ready = self.idle_commands(.1, fds) 324 ready = self.idle_commands(.1, fds)
243 325
244 if len(threading.enumerate()) != 1: 326 if self.idle:
245 serverlog("More than one thread left?: " + str(threading.enumerate())) 327 self.idle.join()
246 328
247 serverlog("Exiting") 329 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
248 # Remove the socket file so we don't get any more connections to avoid races 330 # Remove the socket file so we don't get any more connections to avoid races
331 # The build directory could have been renamed so if the file isn't the one we created
332 # we shouldn't delete it.
249 try: 333 try:
250 os.unlink(self.sockname) 334 sockinode = os.stat(self.sockname)[stat.ST_INO]
251 except: 335 if sockinode == self.sockinode:
252 pass 336 os.unlink(self.sockname)
337 else:
338 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
339 except Exception as err:
340 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
253 self.sock.close() 341 self.sock.close()
254 342
255 try: 343 try:
256 self.cooker.shutdown(True) 344 self.cooker.shutdown(True, idle=False)
257 self.cooker.notifier.stop() 345 self.cooker.notifier.stop()
258 self.cooker.confignotifier.stop() 346 self.cooker.confignotifier.stop()
259 except: 347 except:
@@ -261,6 +349,9 @@ class ProcessServer():
261 349
262 self.cooker.post_serve() 350 self.cooker.post_serve()
263 351
352 if len(threading.enumerate()) != 1:
353 serverlog("More than one thread left?: " + str(threading.enumerate()))
354
264 # Flush logs before we release the lock 355 # Flush logs before we release the lock
265 sys.stdout.flush() 356 sys.stdout.flush()
266 sys.stderr.flush() 357 sys.stderr.flush()
@@ -276,20 +367,21 @@ class ProcessServer():
276 except FileNotFoundError: 367 except FileNotFoundError:
277 return None 368 return None
278 369
279 lockcontents = get_lock_contents(lockfile)
280 serverlog("Original lockfile contents: " + str(lockcontents))
281
282 lock.close() 370 lock.close()
283 lock = None 371 lock = None
284 372
285 while not lock: 373 while not lock:
286 i = 0 374 i = 0
287 lock = None 375 lock = None
376 if not os.path.exists(os.path.basename(lockfile)):
377 serverlog("Lockfile directory gone, exiting.")
378 return
379
288 while not lock and i < 30: 380 while not lock and i < 30:
289 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 381 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
290 if not lock: 382 if not lock:
291 newlockcontents = get_lock_contents(lockfile) 383 newlockcontents = get_lock_contents(lockfile)
292 if newlockcontents != lockcontents: 384 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
293 # A new server was started, the lockfile contents changed, we can exit 385 # A new server was started, the lockfile contents changed, we can exit
294 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 386 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
295 return 387 return
@@ -303,75 +395,108 @@ class ProcessServer():
303 return 395 return
304 396
305 if not lock: 397 if not lock:
306 # Some systems may not have lsof available 398 procs = get_lockfile_process_msg(lockfile)
307 procs = None 399 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
400 if procs:
401 msg.append(":\n%s" % procs)
402 serverlog("".join(msg))
403
404 def idle_thread(self):
405 if self.cooker.configuration.profile:
406 try:
407 import cProfile as profile
408 except:
409 import profile
410 prof = profile.Profile()
411
412 ret = profile.Profile.runcall(prof, self.idle_thread_internal)
413
414 prof.dump_stats("profile-mainloop.log")
415 bb.utils.process_profilelog("profile-mainloop.log")
416 serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
417 else:
418 self.idle_thread_internal()
419
420 def idle_thread_internal(self):
421 def remove_idle_func(function):
422 with bb.utils.lock_timeout(self._idlefuncsLock):
423 del self._idlefuns[function]
424 self.idle_cond.notify_all()
425
426 while not self.quit:
427 nextsleep = 0.1
428 fds = []
429
430 with bb.utils.lock_timeout(self._idlefuncsLock):
431 items = list(self._idlefuns.items())
432
433 for function, data in items:
308 try: 434 try:
309 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 435 retval = function(self, data, False)
310 except subprocess.CalledProcessError: 436 if isinstance(retval, idleFinish):
311 # File was deleted? 437 serverlog("Removing idle function %s at idleFinish" % str(function))
312 continue 438 remove_idle_func(function)
313 except OSError as e: 439 self.cooker.command.finishAsyncCommand(retval.msg)
314 if e.errno != errno.ENOENT: 440 nextsleep = None
315 raise 441 elif retval is False:
316 if procs is None: 442 serverlog("Removing idle function %s" % str(function))
317 # Fall back to fuser if lsof is unavailable 443 remove_idle_func(function)
318 try: 444 nextsleep = None
319 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 445 elif retval is True:
320 except subprocess.CalledProcessError: 446 nextsleep = None
321 # File was deleted? 447 elif isinstance(retval, float) and nextsleep:
448 if (retval < nextsleep):
449 nextsleep = retval
450 elif nextsleep is None:
322 continue 451 continue
323 except OSError as e: 452 else:
324 if e.errno != errno.ENOENT: 453 fds = fds + retval
325 raise 454 except SystemExit:
455 raise
456 except Exception as exc:
457 if not isinstance(exc, bb.BBHandledException):
458 logger.exception('Running idle function')
459 remove_idle_func(function)
460 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
461 self.quit = True
326 462
327 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" 463 # Create new heartbeat event?
328 if procs: 464 now = time.time()
329 msg += ":\n%s" % str(procs.decode("utf-8")) 465 if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
330 serverlog(msg) 466 # We might have missed heartbeats. Just trigger once in
467 # that case and continue after the usual delay.
468 self.next_heartbeat += self.heartbeat_seconds
469 if self.next_heartbeat <= now:
470 self.next_heartbeat = now + self.heartbeat_seconds
471 if hasattr(self.cooker, "data"):
472 heartbeat = bb.event.HeartbeatEvent(now)
473 try:
474 bb.event.fire(heartbeat, self.cooker.data)
475 except Exception as exc:
476 if not isinstance(exc, bb.BBHandledException):
477 logger.exception('Running heartbeat function')
478 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
479 self.quit = True
480 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
481 # Shorten timeout so that we we wake up in time for
482 # the heartbeat.
483 nextsleep = self.next_heartbeat - now
484
485 if nextsleep is not None:
486 select.select(fds,[],[],nextsleep)[0]
331 487
332 def idle_commands(self, delay, fds=None): 488 def idle_commands(self, delay, fds=None):
333 nextsleep = delay 489 nextsleep = delay
334 if not fds: 490 if not fds:
335 fds = [] 491 fds = []
336 492
337 for function, data in list(self._idlefuns.items()): 493 if not self.idle:
338 try: 494 self.idle = threading.Thread(target=self.idle_thread)
339 retval = function(self, data, False) 495 self.idle.start()
340 if retval is False: 496 elif self.idle and not self.idle.is_alive():
341 del self._idlefuns[function] 497 serverlog("Idle thread terminated, main thread exiting too")
342 nextsleep = None 498 bb.error("Idle thread terminated, main thread exiting too")
343 elif retval is True: 499 self.quit = True
344 nextsleep = None
345 elif isinstance(retval, float) and nextsleep:
346 if (retval < nextsleep):
347 nextsleep = retval
348 elif nextsleep is None:
349 continue
350 else:
351 fds = fds + retval
352 except SystemExit:
353 raise
354 except Exception as exc:
355 if not isinstance(exc, bb.BBHandledException):
356 logger.exception('Running idle function')
357 del self._idlefuns[function]
358 self.quit = True
359
360 # Create new heartbeat event?
361 now = time.time()
362 if now >= self.next_heartbeat:
363 # We might have missed heartbeats. Just trigger once in
364 # that case and continue after the usual delay.
365 self.next_heartbeat += self.heartbeat_seconds
366 if self.next_heartbeat <= now:
367 self.next_heartbeat = now + self.heartbeat_seconds
368 if hasattr(self.cooker, "data"):
369 heartbeat = bb.event.HeartbeatEvent(now)
370 bb.event.fire(heartbeat, self.cooker.data)
371 if nextsleep and now + nextsleep > self.next_heartbeat:
372 # Shorten timeout so that we we wake up in time for
373 # the heartbeat.
374 nextsleep = self.next_heartbeat - now
375 500
376 if nextsleep is not None: 501 if nextsleep is not None:
377 if self.xmlrpc: 502 if self.xmlrpc:
@@ -391,12 +516,18 @@ class ServerCommunicator():
391 self.recv = recv 516 self.recv = recv
392 517
393 def runCommand(self, command): 518 def runCommand(self, command):
394 self.connection.send(command) 519 try:
520 self.connection.send(command)
521 except BrokenPipeError as e:
522 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
395 if not self.recv.poll(30): 523 if not self.recv.poll(30):
396 logger.info("No reply from server in 30s") 524 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
397 if not self.recv.poll(30): 525 if not self.recv.poll(30):
398 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 526 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
399 ret, exc = self.recv.get() 527 try:
528 ret, exc = self.recv.get()
529 except EOFError as e:
530 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
400 # Should probably turn all exceptions in exc back into exceptions? 531 # Should probably turn all exceptions in exc back into exceptions?
401 # For now, at least handle BBHandledException 532 # For now, at least handle BBHandledException
402 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 533 if exc and ("BBHandledException" in exc or "SystemExit" in exc):
@@ -429,6 +560,7 @@ class BitBakeProcessServerConnection(object):
429 self.socket_connection = sock 560 self.socket_connection = sock
430 561
431 def terminate(self): 562 def terminate(self):
563 self.events.close()
432 self.socket_connection.close() 564 self.socket_connection.close()
433 self.connection.connection.close() 565 self.connection.connection.close()
434 self.connection.recv.close() 566 self.connection.recv.close()
@@ -439,13 +571,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
439 571
440class BitBakeServer(object): 572class BitBakeServer(object):
441 573
442 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 574 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
443 575
444 self.server_timeout = server_timeout 576 self.server_timeout = server_timeout
445 self.xmlrpcinterface = xmlrpcinterface 577 self.xmlrpcinterface = xmlrpcinterface
446 self.featureset = featureset 578 self.featureset = featureset
447 self.sockname = sockname 579 self.sockname = sockname
448 self.bitbake_lock = lock 580 self.bitbake_lock = lock
581 self.profile = profile
449 self.readypipe, self.readypipein = os.pipe() 582 self.readypipe, self.readypipein = os.pipe()
450 583
451 # Place the log in the builddirectory alongside the lock file 584 # Place the log in the builddirectory alongside the lock file
@@ -466,7 +599,7 @@ class BitBakeServer(object):
466 try: 599 try:
467 r = ready.get() 600 r = ready.get()
468 except EOFError: 601 except EOFError:
469 # Trap the child exitting/closing the pipe and error out 602 # Trap the child exiting/closing the pipe and error out
470 r = None 603 r = None
471 if not r or r[0] != "r": 604 if not r or r[0] != "r":
472 ready.close() 605 ready.close()
@@ -509,9 +642,9 @@ class BitBakeServer(object):
509 os.set_inheritable(self.bitbake_lock.fileno(), True) 642 os.set_inheritable(self.bitbake_lock.fileno(), True)
510 os.set_inheritable(self.readypipein, True) 643 os.set_inheritable(self.readypipein, True)
511 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 644 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
512 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 645 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
513 646
514def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 647def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
515 648
516 import bb.cookerdata 649 import bb.cookerdata
517 import bb.cooker 650 import bb.cooker
@@ -523,6 +656,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
523 656
524 # Create server control socket 657 # Create server control socket
525 if os.path.exists(sockname): 658 if os.path.exists(sockname):
659 serverlog("WARNING: removing existing socket file '%s'" % sockname)
526 os.unlink(sockname) 660 os.unlink(sockname)
527 661
528 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 662 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -539,7 +673,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
539 writer = ConnectionWriter(readypipeinfd) 673 writer = ConnectionWriter(readypipeinfd)
540 try: 674 try:
541 featureset = [] 675 featureset = []
542 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 676 cooker = bb.cooker.BBCooker(featureset, server)
677 cooker.configuration.profile = profile
543 except bb.BBHandledException: 678 except bb.BBHandledException:
544 return None 679 return None
545 writer.send("r") 680 writer.send("r")
@@ -549,7 +684,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
549 684
550 server.run() 685 server.run()
551 finally: 686 finally:
552 # Flush any ,essages/errors to the logfile before exit 687 # Flush any messages/errors to the logfile before exit
553 sys.stdout.flush() 688 sys.stdout.flush()
554 sys.stderr.flush() 689 sys.stderr.flush()
555 690
@@ -654,23 +789,18 @@ class BBUIEventQueue:
654 self.reader = ConnectionReader(readfd) 789 self.reader = ConnectionReader(readfd)
655 790
656 self.t = threading.Thread() 791 self.t = threading.Thread()
657 self.t.setDaemon(True)
658 self.t.run = self.startCallbackHandler 792 self.t.run = self.startCallbackHandler
659 self.t.start() 793 self.t.start()
660 794
661 def getEvent(self): 795 def getEvent(self):
662 self.eventQueueLock.acquire() 796 with bb.utils.lock_timeout(self.eventQueueLock):
663 797 if len(self.eventQueue) == 0:
664 if len(self.eventQueue) == 0: 798 return None
665 self.eventQueueLock.release()
666 return None
667
668 item = self.eventQueue.pop(0)
669 799
670 if len(self.eventQueue) == 0: 800 item = self.eventQueue.pop(0)
671 self.eventQueueNotify.clear() 801 if len(self.eventQueue) == 0:
802 self.eventQueueNotify.clear()
672 803
673 self.eventQueueLock.release()
674 return item 804 return item
675 805
676 def waitEvent(self, delay): 806 def waitEvent(self, delay):
@@ -678,10 +808,9 @@ class BBUIEventQueue:
678 return self.getEvent() 808 return self.getEvent()
679 809
680 def queue_event(self, event): 810 def queue_event(self, event):
681 self.eventQueueLock.acquire() 811 with bb.utils.lock_timeout(self.eventQueueLock):
682 self.eventQueue.append(event) 812 self.eventQueue.append(event)
683 self.eventQueueNotify.set() 813 self.eventQueueNotify.set()
684 self.eventQueueLock.release()
685 814
686 def send_event(self, event): 815 def send_event(self, event):
687 self.queue_event(pickle.loads(event)) 816 self.queue_event(pickle.loads(event))
@@ -690,13 +819,17 @@ class BBUIEventQueue:
690 bb.utils.set_process_name("UIEventQueue") 819 bb.utils.set_process_name("UIEventQueue")
691 while True: 820 while True:
692 try: 821 try:
693 self.reader.wait() 822 ready = self.reader.wait(0.25)
694 event = self.reader.get() 823 if ready:
695 self.queue_event(event) 824 event = self.reader.get()
696 except EOFError: 825 self.queue_event(event)
826 except (EOFError, OSError, TypeError):
697 # Easiest way to exit is to close the file descriptor to cause an exit 827 # Easiest way to exit is to close the file descriptor to cause an exit
698 break 828 break
829
830 def close(self):
699 self.reader.close() 831 self.reader.close()
832 self.t.join()
700 833
701class ConnectionReader(object): 834class ConnectionReader(object):
702 835
@@ -711,7 +844,7 @@ class ConnectionReader(object):
711 return self.reader.poll(timeout) 844 return self.reader.poll(timeout)
712 845
713 def get(self): 846 def get(self):
714 with self.rlock: 847 with bb.utils.lock_timeout(self.rlock):
715 res = self.reader.recv_bytes() 848 res = self.reader.recv_bytes()
716 return multiprocessing.reduction.ForkingPickler.loads(res) 849 return multiprocessing.reduction.ForkingPickler.loads(res)
717 850
@@ -730,10 +863,31 @@ class ConnectionWriter(object):
730 # Why bb.event needs this I have no idea 863 # Why bb.event needs this I have no idea
731 self.event = self 864 self.event = self
732 865
866 def _send(self, obj):
867 gc.disable()
868 with bb.utils.lock_timeout(self.wlock):
869 self.writer.send_bytes(obj)
870 gc.enable()
871
733 def send(self, obj): 872 def send(self, obj):
734 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 873 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
735 with self.wlock: 874 # See notes/code in CookerParser
736 self.writer.send_bytes(obj) 875 # We must not terminate holding this lock else processes will hang.
876 # For SIGTERM, raising afterwards avoids this.
877 # For SIGINT, we don't want to have written partial data to the pipe.
878 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
879 process = multiprocessing.current_process()
880 if process and hasattr(process, "queue_signals"):
881 with bb.utils.lock_timeout(process.signal_threadlock):
882 process.queue_signals = True
883 self._send(obj)
884 process.queue_signals = False
885
886 while len(process.signal_received) > 0:
887 sig = process.signal_received.pop()
888 process.handle_sig(sig, None)
889 else:
890 self._send(obj)
737 891
738 def fileno(self): 892 def fileno(self):
739 return self.writer.fileno() 893 return self.writer.fileno()