summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py404
1 files changed, 258 insertions, 146 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index b27b4aefe0..d0f73590cc 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -13,7 +13,7 @@
13import bb 13import bb
14import bb.event 14import bb.event
15import logging 15import logging
16import multiprocessing 16from bb import multiprocessing
17import threading 17import threading
18import array 18import array
19import os 19import os
@@ -26,6 +26,9 @@ import errno
26import re 26import re
27import datetime 27import datetime
28import pickle 28import pickle
29import traceback
30import gc
31import stat
29import bb.server.xmlrpcserver 32import bb.server.xmlrpcserver
30from bb import daemonize 33from bb import daemonize
31from multiprocessing import queues 34from multiprocessing import queues
@@ -35,14 +38,48 @@ logger = logging.getLogger('BitBake')
35class ProcessTimeout(SystemExit): 38class ProcessTimeout(SystemExit):
36 pass 39 pass
37 40
41def currenttime():
42 return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
38def serverlog(msg): 44def serverlog(msg):
39 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 45 print(str(os.getpid()) + " " + currenttime() + " " + msg)
40 sys.stdout.flush() 46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47 #sys.stdout.flush()
41 48
42class ProcessServer(): 49#
43 profile_filename = "profile.log" 50# When we have lockfile issues, try and find infomation about which process is
44 profile_processed_filename = "profile.log.processed" 51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54 # Some systems may not have lsof available
55 procs = None
56 try:
57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58 except subprocess.CalledProcessError:
59 # File was deleted?
60 pass
61 except OSError as e:
62 if e.errno != errno.ENOENT:
63 raise
64 if procs is None:
65 # Fall back to fuser if lsof is unavailable
66 try:
67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68 except subprocess.CalledProcessError:
69 # File was deleted?
70 pass
71 except OSError as e:
72 if e.errno != errno.ENOENT:
73 raise
74 if procs:
75 return procs.decode("utf-8")
76 return None
77
78class idleFinish():
79 def __init__(self, msg):
80 self.msg = msg
45 81
82class ProcessServer():
46 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 83 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
47 self.command_channel = False 84 self.command_channel = False
48 self.command_channel_reply = False 85 self.command_channel_reply = False
@@ -56,12 +93,19 @@ class ProcessServer():
56 self.maxuiwait = 30 93 self.maxuiwait = 30
57 self.xmlrpc = False 94 self.xmlrpc = False
58 95
96 self.idle = None
97 # Need a lock for _idlefuns changes
59 self._idlefuns = {} 98 self._idlefuns = {}
99 self._idlefuncsLock = threading.Lock()
100 self.idle_cond = threading.Condition(self._idlefuncsLock)
60 101
61 self.bitbake_lock = lock 102 self.bitbake_lock = lock
62 self.bitbake_lock_name = lockname 103 self.bitbake_lock_name = lockname
63 self.sock = sock 104 self.sock = sock
64 self.sockname = sockname 105 self.sockname = sockname
106 # It is possible the directory may be renamed. Cache the inode of the socket file
107 # so we can tell if things changed.
108 self.sockinode = os.stat(self.sockname)[stat.ST_INO]
65 109
66 self.server_timeout = server_timeout 110 self.server_timeout = server_timeout
67 self.timeout = self.server_timeout 111 self.timeout = self.server_timeout
@@ -70,7 +114,9 @@ class ProcessServer():
70 def register_idle_function(self, function, data): 114 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle""" 115 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__') 116 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data 117 with bb.utils.lock_timeout(self._idlefuncsLock):
118 self._idlefuns[function] = data
119 serverlog("Registering idle function %s" % str(function))
74 120
75 def run(self): 121 def run(self):
76 122
@@ -91,23 +137,32 @@ class ProcessServer():
91 serverlog("Error writing to lock file: %s" % str(e)) 137 serverlog("Error writing to lock file: %s" % str(e))
92 pass 138 pass
93 139
94 if self.cooker.configuration.profile: 140 return bb.utils.profile_function("main" in self.cooker.configuration.profile, self.main, "profile-mainloop.log")
95 try:
96 import cProfile as profile
97 except:
98 import profile
99 prof = profile.Profile()
100 141
101 ret = profile.Profile.runcall(prof, self.main) 142 def _idle_check(self):
143 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
102 144
103 prof.dump_stats("profile.log") 145 def wait_for_idle(self, timeout=30):
104 bb.utils.process_profilelog("profile.log") 146 # Wait for the idle loop to have cleared
105 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 147 with bb.utils.lock_timeout(self._idlefuncsLock):
148 return self.idle_cond.wait_for(self._idle_check, timeout) is not False
106 149
107 else: 150 def set_async_cmd(self, cmd):
108 ret = self.main() 151 with bb.utils.lock_timeout(self._idlefuncsLock):
152 ret = self.idle_cond.wait_for(self._idle_check, 30)
153 if ret is False:
154 return False
155 self.cooker.command.currentAsyncCommand = cmd
156 return True
109 157
110 return ret 158 def clear_async_cmd(self):
159 with bb.utils.lock_timeout(self._idlefuncsLock):
160 self.cooker.command.currentAsyncCommand = None
161 self.idle_cond.notify_all()
162
163 def get_async_cmd(self):
164 with bb.utils.lock_timeout(self._idlefuncsLock):
165 return self.cooker.command.currentAsyncCommand
111 166
112 def main(self): 167 def main(self):
113 self.cooker.pre_serve() 168 self.cooker.pre_serve()
@@ -123,14 +178,19 @@ class ProcessServer():
123 fds.append(self.xmlrpc) 178 fds.append(self.xmlrpc)
124 seendata = False 179 seendata = False
125 serverlog("Entering server connection loop") 180 serverlog("Entering server connection loop")
181 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
126 182
127 def disconnect_client(self, fds): 183 def disconnect_client(self, fds):
128 serverlog("Disconnecting Client") 184 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
129 if self.controllersock: 185 if self.controllersock:
130 fds.remove(self.controllersock) 186 fds.remove(self.controllersock)
131 self.controllersock.close() 187 self.controllersock.close()
132 self.controllersock = False 188 self.controllersock = False
133 if self.haveui: 189 if self.haveui:
190 # Wait for the idle loop to have cleared (30s max)
191 if not self.wait_for_idle(30):
192 serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
193 self.quit = True
134 fds.remove(self.command_channel) 194 fds.remove(self.command_channel)
135 bb.event.unregister_UIHhandler(self.event_handle, True) 195 bb.event.unregister_UIHhandler(self.event_handle, True)
136 self.command_channel_reply.writer.close() 196 self.command_channel_reply.writer.close()
@@ -142,12 +202,12 @@ class ProcessServer():
142 self.cooker.clientComplete() 202 self.cooker.clientComplete()
143 self.haveui = False 203 self.haveui = False
144 ready = select.select(fds,[],[],0)[0] 204 ready = select.select(fds,[],[],0)[0]
145 if newconnections: 205 if newconnections and not self.quit:
146 serverlog("Starting new client") 206 serverlog("Starting new client")
147 conn = newconnections.pop(-1) 207 conn = newconnections.pop(-1)
148 fds.append(conn) 208 fds.append(conn)
149 self.controllersock = conn 209 self.controllersock = conn
150 elif self.timeout is None and not ready: 210 elif not self.timeout and not ready:
151 serverlog("No timeout, exiting.") 211 serverlog("No timeout, exiting.")
152 self.quit = True 212 self.quit = True
153 213
@@ -214,11 +274,14 @@ class ProcessServer():
214 continue 274 continue
215 try: 275 try:
216 serverlog("Running command %s" % command) 276 serverlog("Running command %s" % command)
217 self.command_channel_reply.send(self.cooker.command.runCommand(command)) 277 reply = self.cooker.command.runCommand(command, self)
218 serverlog("Command Completed") 278 serverlog("Sending reply %s" % repr(reply))
279 self.command_channel_reply.send(reply)
280 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
219 except Exception as e: 281 except Exception as e:
220 serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) 282 stack = traceback.format_exc()
221 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) 283 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
284 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
222 285
223 if self.xmlrpc in ready: 286 if self.xmlrpc in ready:
224 self.xmlrpc.handle_requests() 287 self.xmlrpc.handle_requests()
@@ -239,21 +302,42 @@ class ProcessServer():
239 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 302 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
240 seendata = True 303 seendata = True
241 304
242 ready = self.idle_commands(.1, fds) 305 if not self.idle:
306 self.idle = threading.Thread(target=self.idle_thread)
307 self.idle.start()
308 elif self.idle and not self.idle.is_alive():
309 serverlog("Idle thread terminated, main thread exiting too")
310 bb.error("Idle thread terminated, main thread exiting too")
311 self.quit = True
243 312
244 if len(threading.enumerate()) != 1: 313 nextsleep = 1.0
245 serverlog("More than one thread left?: " + str(threading.enumerate())) 314 if self.xmlrpc:
315 nextsleep = self.xmlrpc.get_timeout(nextsleep)
316 try:
317 ready = select.select(fds,[],[],nextsleep)[0]
318 except InterruptedError:
319 # Ignore EINTR
320 ready = []
321
322 if self.idle:
323 self.idle.join()
246 324
247 serverlog("Exiting") 325 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
248 # Remove the socket file so we don't get any more connections to avoid races 326 # Remove the socket file so we don't get any more connections to avoid races
327 # The build directory could have been renamed so if the file isn't the one we created
328 # we shouldn't delete it.
249 try: 329 try:
250 os.unlink(self.sockname) 330 sockinode = os.stat(self.sockname)[stat.ST_INO]
251 except: 331 if sockinode == self.sockinode:
252 pass 332 os.unlink(self.sockname)
333 else:
334 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
335 except Exception as err:
336 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
253 self.sock.close() 337 self.sock.close()
254 338
255 try: 339 try:
256 self.cooker.shutdown(True) 340 self.cooker.shutdown(True, idle=False)
257 self.cooker.notifier.stop() 341 self.cooker.notifier.stop()
258 self.cooker.confignotifier.stop() 342 self.cooker.confignotifier.stop()
259 except: 343 except:
@@ -261,6 +345,9 @@ class ProcessServer():
261 345
262 self.cooker.post_serve() 346 self.cooker.post_serve()
263 347
348 if len(threading.enumerate()) != 1:
349 serverlog("More than one thread left?: " + str(threading.enumerate()))
350
264 # Flush logs before we release the lock 351 # Flush logs before we release the lock
265 sys.stdout.flush() 352 sys.stdout.flush()
266 sys.stderr.flush() 353 sys.stderr.flush()
@@ -276,20 +363,21 @@ class ProcessServer():
276 except FileNotFoundError: 363 except FileNotFoundError:
277 return None 364 return None
278 365
279 lockcontents = get_lock_contents(lockfile)
280 serverlog("Original lockfile contents: " + str(lockcontents))
281
282 lock.close() 366 lock.close()
283 lock = None 367 lock = None
284 368
285 while not lock: 369 while not lock:
286 i = 0 370 i = 0
287 lock = None 371 lock = None
372 if not os.path.exists(os.path.basename(lockfile)):
373 serverlog("Lockfile directory gone, exiting.")
374 return
375
288 while not lock and i < 30: 376 while not lock and i < 30:
289 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 377 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
290 if not lock: 378 if not lock:
291 newlockcontents = get_lock_contents(lockfile) 379 newlockcontents = get_lock_contents(lockfile)
292 if newlockcontents != lockcontents: 380 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
293 # A new server was started, the lockfile contents changed, we can exit 381 # A new server was started, the lockfile contents changed, we can exit
294 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 382 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
295 return 383 return
@@ -303,87 +391,82 @@ class ProcessServer():
303 return 391 return
304 392
305 if not lock: 393 if not lock:
306 # Some systems may not have lsof available 394 procs = get_lockfile_process_msg(lockfile)
307 procs = None 395 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
308 try:
309 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
310 except subprocess.CalledProcessError:
311 # File was deleted?
312 continue
313 except OSError as e:
314 if e.errno != errno.ENOENT:
315 raise
316 if procs is None:
317 # Fall back to fuser if lsof is unavailable
318 try:
319 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
320 except subprocess.CalledProcessError:
321 # File was deleted?
322 continue
323 except OSError as e:
324 if e.errno != errno.ENOENT:
325 raise
326
327 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
328 if procs: 396 if procs:
329 msg += ":\n%s" % str(procs.decode("utf-8")) 397 msg.append(":\n%s" % procs)
330 serverlog(msg) 398 serverlog("".join(msg))
331 399
332 def idle_commands(self, delay, fds=None): 400 def idle_thread(self):
333 nextsleep = delay 401 bb.utils.profile_function("idle" in self.cooker.configuration.profile, self.idle_thread_internal, "profile-idleloop.log")
334 if not fds:
335 fds = []
336 402
337 for function, data in list(self._idlefuns.items()): 403 def idle_thread_internal(self):
338 try: 404 def remove_idle_func(function):
339 retval = function(self, data, False) 405 with bb.utils.lock_timeout(self._idlefuncsLock):
340 if retval is False:
341 del self._idlefuns[function]
342 nextsleep = None
343 elif retval is True:
344 nextsleep = None
345 elif isinstance(retval, float) and nextsleep:
346 if (retval < nextsleep):
347 nextsleep = retval
348 elif nextsleep is None:
349 continue
350 else:
351 fds = fds + retval
352 except SystemExit:
353 raise
354 except Exception as exc:
355 if not isinstance(exc, bb.BBHandledException):
356 logger.exception('Running idle function')
357 del self._idlefuns[function] 406 del self._idlefuns[function]
358 self.quit = True 407 self.idle_cond.notify_all()
359 408
360 # Create new heartbeat event? 409 while not self.quit:
361 now = time.time() 410 nextsleep = 1.0
362 if now >= self.next_heartbeat: 411 fds = []
363 # We might have missed heartbeats. Just trigger once in
364 # that case and continue after the usual delay.
365 self.next_heartbeat += self.heartbeat_seconds
366 if self.next_heartbeat <= now:
367 self.next_heartbeat = now + self.heartbeat_seconds
368 if hasattr(self.cooker, "data"):
369 heartbeat = bb.event.HeartbeatEvent(now)
370 bb.event.fire(heartbeat, self.cooker.data)
371 if nextsleep and now + nextsleep > self.next_heartbeat:
372 # Shorten timeout so that we we wake up in time for
373 # the heartbeat.
374 nextsleep = self.next_heartbeat - now
375
376 if nextsleep is not None:
377 if self.xmlrpc:
378 nextsleep = self.xmlrpc.get_timeout(nextsleep)
379 try:
380 return select.select(fds,[],[],nextsleep)[0]
381 except InterruptedError:
382 # Ignore EINTR
383 return []
384 else:
385 return select.select(fds,[],[],0)[0]
386 412
413 with bb.utils.lock_timeout(self._idlefuncsLock):
414 items = list(self._idlefuns.items())
415
416 for function, data in items:
417 try:
418 retval = function(self, data, False)
419 if isinstance(retval, idleFinish):
420 serverlog("Removing idle function %s at idleFinish" % str(function))
421 remove_idle_func(function)
422 self.cooker.command.finishAsyncCommand(retval.msg)
423 nextsleep = None
424 elif retval is False:
425 serverlog("Removing idle function %s" % str(function))
426 remove_idle_func(function)
427 nextsleep = None
428 elif retval is True:
429 nextsleep = None
430 elif isinstance(retval, float) and nextsleep:
431 if (retval < nextsleep):
432 nextsleep = retval
433 elif nextsleep is None:
434 continue
435 else:
436 fds = fds + retval
437 except SystemExit:
438 raise
439 except Exception as exc:
440 if not isinstance(exc, bb.BBHandledException):
441 logger.exception('Running idle function')
442 remove_idle_func(function)
443 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
444 self.quit = True
445
446 # Create new heartbeat event?
447 now = time.time()
448 if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat:
449 # We might have missed heartbeats. Just trigger once in
450 # that case and continue after the usual delay.
451 self.next_heartbeat += self.heartbeat_seconds
452 if self.next_heartbeat <= now:
453 self.next_heartbeat = now + self.heartbeat_seconds
454 if hasattr(self.cooker, "data"):
455 heartbeat = bb.event.HeartbeatEvent(now)
456 try:
457 bb.event.fire(heartbeat, self.cooker.data)
458 except Exception as exc:
459 if not isinstance(exc, bb.BBHandledException):
460 logger.exception('Running heartbeat function')
461 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
462 self.quit = True
463 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
464 # Shorten timeout so that we we wake up in time for
465 # the heartbeat.
466 nextsleep = self.next_heartbeat - now
467
468 if nextsleep is not None:
469 select.select(fds,[],[],nextsleep)[0]
387 470
388class ServerCommunicator(): 471class ServerCommunicator():
389 def __init__(self, connection, recv): 472 def __init__(self, connection, recv):
@@ -391,12 +474,18 @@ class ServerCommunicator():
391 self.recv = recv 474 self.recv = recv
392 475
393 def runCommand(self, command): 476 def runCommand(self, command):
394 self.connection.send(command) 477 try:
478 self.connection.send(command)
479 except BrokenPipeError as e:
480 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
395 if not self.recv.poll(30): 481 if not self.recv.poll(30):
396 logger.info("No reply from server in 30s") 482 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
397 if not self.recv.poll(30): 483 if not self.recv.poll(30):
398 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 484 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
399 ret, exc = self.recv.get() 485 try:
486 ret, exc = self.recv.get()
487 except EOFError as e:
488 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
400 # Should probably turn all exceptions in exc back into exceptions? 489 # Should probably turn all exceptions in exc back into exceptions?
401 # For now, at least handle BBHandledException 490 # For now, at least handle BBHandledException
402 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 491 if exc and ("BBHandledException" in exc or "SystemExit" in exc):
@@ -429,6 +518,7 @@ class BitBakeProcessServerConnection(object):
429 self.socket_connection = sock 518 self.socket_connection = sock
430 519
431 def terminate(self): 520 def terminate(self):
521 self.events.close()
432 self.socket_connection.close() 522 self.socket_connection.close()
433 self.connection.connection.close() 523 self.connection.connection.close()
434 self.connection.recv.close() 524 self.connection.recv.close()
@@ -439,13 +529,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
439 529
440class BitBakeServer(object): 530class BitBakeServer(object):
441 531
442 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 532 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
443 533
444 self.server_timeout = server_timeout 534 self.server_timeout = server_timeout
445 self.xmlrpcinterface = xmlrpcinterface 535 self.xmlrpcinterface = xmlrpcinterface
446 self.featureset = featureset 536 self.featureset = featureset
447 self.sockname = sockname 537 self.sockname = sockname
448 self.bitbake_lock = lock 538 self.bitbake_lock = lock
539 self.profile = profile
449 self.readypipe, self.readypipein = os.pipe() 540 self.readypipe, self.readypipein = os.pipe()
450 541
451 # Place the log in the builddirectory alongside the lock file 542 # Place the log in the builddirectory alongside the lock file
@@ -466,7 +557,7 @@ class BitBakeServer(object):
466 try: 557 try:
467 r = ready.get() 558 r = ready.get()
468 except EOFError: 559 except EOFError:
469 # Trap the child exitting/closing the pipe and error out 560 # Trap the child exiting/closing the pipe and error out
470 r = None 561 r = None
471 if not r or r[0] != "r": 562 if not r or r[0] != "r":
472 ready.close() 563 ready.close()
@@ -509,9 +600,9 @@ class BitBakeServer(object):
509 os.set_inheritable(self.bitbake_lock.fileno(), True) 600 os.set_inheritable(self.bitbake_lock.fileno(), True)
510 os.set_inheritable(self.readypipein, True) 601 os.set_inheritable(self.readypipein, True)
511 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 602 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
512 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 603 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(list(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
513 604
514def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 605def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
515 606
516 import bb.cookerdata 607 import bb.cookerdata
517 import bb.cooker 608 import bb.cooker
@@ -523,6 +614,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
523 614
524 # Create server control socket 615 # Create server control socket
525 if os.path.exists(sockname): 616 if os.path.exists(sockname):
617 serverlog("WARNING: removing existing socket file '%s'" % sockname)
526 os.unlink(sockname) 618 os.unlink(sockname)
527 619
528 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 620 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -539,7 +631,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
539 writer = ConnectionWriter(readypipeinfd) 631 writer = ConnectionWriter(readypipeinfd)
540 try: 632 try:
541 featureset = [] 633 featureset = []
542 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 634 cooker = bb.cooker.BBCooker(featureset, server)
635 cooker.configuration.profile = profile
543 except bb.BBHandledException: 636 except bb.BBHandledException:
544 return None 637 return None
545 writer.send("r") 638 writer.send("r")
@@ -549,7 +642,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc
549 642
550 server.run() 643 server.run()
551 finally: 644 finally:
552 # Flush any ,essages/errors to the logfile before exit 645 # Flush any messages/errors to the logfile before exit
553 sys.stdout.flush() 646 sys.stdout.flush()
554 sys.stderr.flush() 647 sys.stderr.flush()
555 648
@@ -654,23 +747,18 @@ class BBUIEventQueue:
654 self.reader = ConnectionReader(readfd) 747 self.reader = ConnectionReader(readfd)
655 748
656 self.t = threading.Thread() 749 self.t = threading.Thread()
657 self.t.setDaemon(True)
658 self.t.run = self.startCallbackHandler 750 self.t.run = self.startCallbackHandler
659 self.t.start() 751 self.t.start()
660 752
661 def getEvent(self): 753 def getEvent(self):
662 self.eventQueueLock.acquire() 754 with bb.utils.lock_timeout(self.eventQueueLock):
663 755 if len(self.eventQueue) == 0:
664 if len(self.eventQueue) == 0: 756 return None
665 self.eventQueueLock.release()
666 return None
667
668 item = self.eventQueue.pop(0)
669 757
670 if len(self.eventQueue) == 0: 758 item = self.eventQueue.pop(0)
671 self.eventQueueNotify.clear() 759 if len(self.eventQueue) == 0:
760 self.eventQueueNotify.clear()
672 761
673 self.eventQueueLock.release()
674 return item 762 return item
675 763
676 def waitEvent(self, delay): 764 def waitEvent(self, delay):
@@ -678,10 +766,9 @@ class BBUIEventQueue:
678 return self.getEvent() 766 return self.getEvent()
679 767
680 def queue_event(self, event): 768 def queue_event(self, event):
681 self.eventQueueLock.acquire() 769 with bb.utils.lock_timeout(self.eventQueueLock):
682 self.eventQueue.append(event) 770 self.eventQueue.append(event)
683 self.eventQueueNotify.set() 771 self.eventQueueNotify.set()
684 self.eventQueueLock.release()
685 772
686 def send_event(self, event): 773 def send_event(self, event):
687 self.queue_event(pickle.loads(event)) 774 self.queue_event(pickle.loads(event))
@@ -690,13 +777,17 @@ class BBUIEventQueue:
690 bb.utils.set_process_name("UIEventQueue") 777 bb.utils.set_process_name("UIEventQueue")
691 while True: 778 while True:
692 try: 779 try:
693 self.reader.wait() 780 ready = self.reader.wait(0.25)
694 event = self.reader.get() 781 if ready:
695 self.queue_event(event) 782 event = self.reader.get()
696 except EOFError: 783 self.queue_event(event)
784 except (EOFError, OSError, TypeError):
697 # Easiest way to exit is to close the file descriptor to cause an exit 785 # Easiest way to exit is to close the file descriptor to cause an exit
698 break 786 break
787
788 def close(self):
699 self.reader.close() 789 self.reader.close()
790 self.t.join()
700 791
701class ConnectionReader(object): 792class ConnectionReader(object):
702 793
@@ -711,7 +802,7 @@ class ConnectionReader(object):
711 return self.reader.poll(timeout) 802 return self.reader.poll(timeout)
712 803
713 def get(self): 804 def get(self):
714 with self.rlock: 805 with bb.utils.lock_timeout(self.rlock):
715 res = self.reader.recv_bytes() 806 res = self.reader.recv_bytes()
716 return multiprocessing.reduction.ForkingPickler.loads(res) 807 return multiprocessing.reduction.ForkingPickler.loads(res)
717 808
@@ -730,10 +821,31 @@ class ConnectionWriter(object):
730 # Why bb.event needs this I have no idea 821 # Why bb.event needs this I have no idea
731 self.event = self 822 self.event = self
732 823
824 def _send(self, obj):
825 gc.disable()
826 with bb.utils.lock_timeout(self.wlock):
827 self.writer.send_bytes(obj)
828 gc.enable()
829
733 def send(self, obj): 830 def send(self, obj):
734 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 831 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
735 with self.wlock: 832 # See notes/code in CookerParser
736 self.writer.send_bytes(obj) 833 # We must not terminate holding this lock else processes will hang.
834 # For SIGTERM, raising afterwards avoids this.
835 # For SIGINT, we don't want to have written partial data to the pipe.
836 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
837 process = multiprocessing.current_process()
838 if process and hasattr(process, "queue_signals"):
839 with bb.utils.lock_timeout(process.signal_threadlock):
840 process.queue_signals = True
841 self._send(obj)
842 process.queue_signals = False
843
844 while len(process.signal_received) > 0:
845 sig = process.signal_received.pop()
846 process.handle_sig(sig, None)
847 else:
848 self._send(obj)
737 849
738 def fileno(self): 850 def fileno(self):
739 return self.writer.fileno() 851 return self.writer.fileno()