diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 380 |
1 files changed, 267 insertions, 113 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index b27b4aefe0..76b189291d 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
@@ -26,6 +26,9 @@ import errno | |||
26 | import re | 26 | import re |
27 | import datetime | 27 | import datetime |
28 | import pickle | 28 | import pickle |
29 | import traceback | ||
30 | import gc | ||
31 | import stat | ||
29 | import bb.server.xmlrpcserver | 32 | import bb.server.xmlrpcserver |
30 | from bb import daemonize | 33 | from bb import daemonize |
31 | from multiprocessing import queues | 34 | from multiprocessing import queues |
@@ -35,9 +38,46 @@ logger = logging.getLogger('BitBake') | |||
35 | class ProcessTimeout(SystemExit): | 38 | class ProcessTimeout(SystemExit): |
36 | pass | 39 | pass |
37 | 40 | ||
41 | def currenttime(): | ||
42 | return datetime.datetime.now().strftime('%H:%M:%S.%f') | ||
43 | |||
38 | def serverlog(msg): | 44 | def serverlog(msg): |
39 | print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) | 45 | print(str(os.getpid()) + " " + currenttime() + " " + msg) |
40 | sys.stdout.flush() | 46 | #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server |
47 | #sys.stdout.flush() | ||
48 | |||
49 | # | ||
50 | # When we have lockfile issues, try and find infomation about which process is | ||
51 | # using the lockfile | ||
52 | # | ||
53 | def get_lockfile_process_msg(lockfile): | ||
54 | # Some systems may not have lsof available | ||
55 | procs = None | ||
56 | try: | ||
57 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
58 | except subprocess.CalledProcessError: | ||
59 | # File was deleted? | ||
60 | pass | ||
61 | except OSError as e: | ||
62 | if e.errno != errno.ENOENT: | ||
63 | raise | ||
64 | if procs is None: | ||
65 | # Fall back to fuser if lsof is unavailable | ||
66 | try: | ||
67 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
68 | except subprocess.CalledProcessError: | ||
69 | # File was deleted? | ||
70 | pass | ||
71 | except OSError as e: | ||
72 | if e.errno != errno.ENOENT: | ||
73 | raise | ||
74 | if procs: | ||
75 | return procs.decode("utf-8") | ||
76 | return None | ||
77 | |||
78 | class idleFinish(): | ||
79 | def __init__(self, msg): | ||
80 | self.msg = msg | ||
41 | 81 | ||
42 | class ProcessServer(): | 82 | class ProcessServer(): |
43 | profile_filename = "profile.log" | 83 | profile_filename = "profile.log" |
@@ -56,12 +96,19 @@ class ProcessServer(): | |||
56 | self.maxuiwait = 30 | 96 | self.maxuiwait = 30 |
57 | self.xmlrpc = False | 97 | self.xmlrpc = False |
58 | 98 | ||
99 | self.idle = None | ||
100 | # Need a lock for _idlefuns changes | ||
59 | self._idlefuns = {} | 101 | self._idlefuns = {} |
102 | self._idlefuncsLock = threading.Lock() | ||
103 | self.idle_cond = threading.Condition(self._idlefuncsLock) | ||
60 | 104 | ||
61 | self.bitbake_lock = lock | 105 | self.bitbake_lock = lock |
62 | self.bitbake_lock_name = lockname | 106 | self.bitbake_lock_name = lockname |
63 | self.sock = sock | 107 | self.sock = sock |
64 | self.sockname = sockname | 108 | self.sockname = sockname |
109 | # It is possible the directory may be renamed. Cache the inode of the socket file | ||
110 | # so we can tell if things changed. | ||
111 | self.sockinode = os.stat(self.sockname)[stat.ST_INO] | ||
65 | 112 | ||
66 | self.server_timeout = server_timeout | 113 | self.server_timeout = server_timeout |
67 | self.timeout = self.server_timeout | 114 | self.timeout = self.server_timeout |
@@ -70,7 +117,9 @@ class ProcessServer(): | |||
70 | def register_idle_function(self, function, data): | 117 | def register_idle_function(self, function, data): |
71 | """Register a function to be called while the server is idle""" | 118 | """Register a function to be called while the server is idle""" |
72 | assert hasattr(function, '__call__') | 119 | assert hasattr(function, '__call__') |
73 | self._idlefuns[function] = data | 120 | with bb.utils.lock_timeout(self._idlefuncsLock): |
121 | self._idlefuns[function] = data | ||
122 | serverlog("Registering idle function %s" % str(function)) | ||
74 | 123 | ||
75 | def run(self): | 124 | def run(self): |
76 | 125 | ||
@@ -109,6 +158,31 @@ class ProcessServer(): | |||
109 | 158 | ||
110 | return ret | 159 | return ret |
111 | 160 | ||
161 | def _idle_check(self): | ||
162 | return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None | ||
163 | |||
164 | def wait_for_idle(self, timeout=30): | ||
165 | # Wait for the idle loop to have cleared | ||
166 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
167 | return self.idle_cond.wait_for(self._idle_check, timeout) is not False | ||
168 | |||
169 | def set_async_cmd(self, cmd): | ||
170 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
171 | ret = self.idle_cond.wait_for(self._idle_check, 30) | ||
172 | if ret is False: | ||
173 | return False | ||
174 | self.cooker.command.currentAsyncCommand = cmd | ||
175 | return True | ||
176 | |||
177 | def clear_async_cmd(self): | ||
178 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
179 | self.cooker.command.currentAsyncCommand = None | ||
180 | self.idle_cond.notify_all() | ||
181 | |||
182 | def get_async_cmd(self): | ||
183 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
184 | return self.cooker.command.currentAsyncCommand | ||
185 | |||
112 | def main(self): | 186 | def main(self): |
113 | self.cooker.pre_serve() | 187 | self.cooker.pre_serve() |
114 | 188 | ||
@@ -123,14 +197,19 @@ class ProcessServer(): | |||
123 | fds.append(self.xmlrpc) | 197 | fds.append(self.xmlrpc) |
124 | seendata = False | 198 | seendata = False |
125 | serverlog("Entering server connection loop") | 199 | serverlog("Entering server connection loop") |
200 | serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) | ||
126 | 201 | ||
127 | def disconnect_client(self, fds): | 202 | def disconnect_client(self, fds): |
128 | serverlog("Disconnecting Client") | 203 | serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) |
129 | if self.controllersock: | 204 | if self.controllersock: |
130 | fds.remove(self.controllersock) | 205 | fds.remove(self.controllersock) |
131 | self.controllersock.close() | 206 | self.controllersock.close() |
132 | self.controllersock = False | 207 | self.controllersock = False |
133 | if self.haveui: | 208 | if self.haveui: |
209 | # Wait for the idle loop to have cleared (30s max) | ||
210 | if not self.wait_for_idle(30): | ||
211 | serverlog("Idle loop didn't finish queued commands after 30s, exiting.") | ||
212 | self.quit = True | ||
134 | fds.remove(self.command_channel) | 213 | fds.remove(self.command_channel) |
135 | bb.event.unregister_UIHhandler(self.event_handle, True) | 214 | bb.event.unregister_UIHhandler(self.event_handle, True) |
136 | self.command_channel_reply.writer.close() | 215 | self.command_channel_reply.writer.close() |
@@ -142,12 +221,12 @@ class ProcessServer(): | |||
142 | self.cooker.clientComplete() | 221 | self.cooker.clientComplete() |
143 | self.haveui = False | 222 | self.haveui = False |
144 | ready = select.select(fds,[],[],0)[0] | 223 | ready = select.select(fds,[],[],0)[0] |
145 | if newconnections: | 224 | if newconnections and not self.quit: |
146 | serverlog("Starting new client") | 225 | serverlog("Starting new client") |
147 | conn = newconnections.pop(-1) | 226 | conn = newconnections.pop(-1) |
148 | fds.append(conn) | 227 | fds.append(conn) |
149 | self.controllersock = conn | 228 | self.controllersock = conn |
150 | elif self.timeout is None and not ready: | 229 | elif not self.timeout and not ready: |
151 | serverlog("No timeout, exiting.") | 230 | serverlog("No timeout, exiting.") |
152 | self.quit = True | 231 | self.quit = True |
153 | 232 | ||
@@ -214,11 +293,14 @@ class ProcessServer(): | |||
214 | continue | 293 | continue |
215 | try: | 294 | try: |
216 | serverlog("Running command %s" % command) | 295 | serverlog("Running command %s" % command) |
217 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) | 296 | reply = self.cooker.command.runCommand(command, self) |
218 | serverlog("Command Completed") | 297 | serverlog("Sending reply %s" % repr(reply)) |
298 | self.command_channel_reply.send(reply) | ||
299 | serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) | ||
219 | except Exception as e: | 300 | except Exception as e: |
220 | serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 301 | stack = traceback.format_exc() |
221 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 302 | serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) |
303 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) | ||
222 | 304 | ||
223 | if self.xmlrpc in ready: | 305 | if self.xmlrpc in ready: |
224 | self.xmlrpc.handle_requests() | 306 | self.xmlrpc.handle_requests() |
@@ -241,19 +323,25 @@ class ProcessServer(): | |||
241 | 323 | ||
242 | ready = self.idle_commands(.1, fds) | 324 | ready = self.idle_commands(.1, fds) |
243 | 325 | ||
244 | if len(threading.enumerate()) != 1: | 326 | if self.idle: |
245 | serverlog("More than one thread left?: " + str(threading.enumerate())) | 327 | self.idle.join() |
246 | 328 | ||
247 | serverlog("Exiting") | 329 | serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) |
248 | # Remove the socket file so we don't get any more connections to avoid races | 330 | # Remove the socket file so we don't get any more connections to avoid races |
331 | # The build directory could have been renamed so if the file isn't the one we created | ||
332 | # we shouldn't delete it. | ||
249 | try: | 333 | try: |
250 | os.unlink(self.sockname) | 334 | sockinode = os.stat(self.sockname)[stat.ST_INO] |
251 | except: | 335 | if sockinode == self.sockinode: |
252 | pass | 336 | os.unlink(self.sockname) |
337 | else: | ||
338 | serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) | ||
339 | except Exception as err: | ||
340 | serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) | ||
253 | self.sock.close() | 341 | self.sock.close() |
254 | 342 | ||
255 | try: | 343 | try: |
256 | self.cooker.shutdown(True) | 344 | self.cooker.shutdown(True, idle=False) |
257 | self.cooker.notifier.stop() | 345 | self.cooker.notifier.stop() |
258 | self.cooker.confignotifier.stop() | 346 | self.cooker.confignotifier.stop() |
259 | except: | 347 | except: |
@@ -261,6 +349,9 @@ class ProcessServer(): | |||
261 | 349 | ||
262 | self.cooker.post_serve() | 350 | self.cooker.post_serve() |
263 | 351 | ||
352 | if len(threading.enumerate()) != 1: | ||
353 | serverlog("More than one thread left?: " + str(threading.enumerate())) | ||
354 | |||
264 | # Flush logs before we release the lock | 355 | # Flush logs before we release the lock |
265 | sys.stdout.flush() | 356 | sys.stdout.flush() |
266 | sys.stderr.flush() | 357 | sys.stderr.flush() |
@@ -276,20 +367,21 @@ class ProcessServer(): | |||
276 | except FileNotFoundError: | 367 | except FileNotFoundError: |
277 | return None | 368 | return None |
278 | 369 | ||
279 | lockcontents = get_lock_contents(lockfile) | ||
280 | serverlog("Original lockfile contents: " + str(lockcontents)) | ||
281 | |||
282 | lock.close() | 370 | lock.close() |
283 | lock = None | 371 | lock = None |
284 | 372 | ||
285 | while not lock: | 373 | while not lock: |
286 | i = 0 | 374 | i = 0 |
287 | lock = None | 375 | lock = None |
376 | if not os.path.exists(os.path.basename(lockfile)): | ||
377 | serverlog("Lockfile directory gone, exiting.") | ||
378 | return | ||
379 | |||
288 | while not lock and i < 30: | 380 | while not lock and i < 30: |
289 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) | 381 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) |
290 | if not lock: | 382 | if not lock: |
291 | newlockcontents = get_lock_contents(lockfile) | 383 | newlockcontents = get_lock_contents(lockfile) |
292 | if newlockcontents != lockcontents: | 384 | if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): |
293 | # A new server was started, the lockfile contents changed, we can exit | 385 | # A new server was started, the lockfile contents changed, we can exit |
294 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) | 386 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) |
295 | return | 387 | return |
@@ -303,75 +395,108 @@ class ProcessServer(): | |||
303 | return | 395 | return |
304 | 396 | ||
305 | if not lock: | 397 | if not lock: |
306 | # Some systems may not have lsof available | 398 | procs = get_lockfile_process_msg(lockfile) |
307 | procs = None | 399 | msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] |
400 | if procs: | ||
401 | msg.append(":\n%s" % procs) | ||
402 | serverlog("".join(msg)) | ||
403 | |||
404 | def idle_thread(self): | ||
405 | if self.cooker.configuration.profile: | ||
406 | try: | ||
407 | import cProfile as profile | ||
408 | except: | ||
409 | import profile | ||
410 | prof = profile.Profile() | ||
411 | |||
412 | ret = profile.Profile.runcall(prof, self.idle_thread_internal) | ||
413 | |||
414 | prof.dump_stats("profile-mainloop.log") | ||
415 | bb.utils.process_profilelog("profile-mainloop.log") | ||
416 | serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") | ||
417 | else: | ||
418 | self.idle_thread_internal() | ||
419 | |||
420 | def idle_thread_internal(self): | ||
421 | def remove_idle_func(function): | ||
422 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
423 | del self._idlefuns[function] | ||
424 | self.idle_cond.notify_all() | ||
425 | |||
426 | while not self.quit: | ||
427 | nextsleep = 0.1 | ||
428 | fds = [] | ||
429 | |||
430 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
431 | items = list(self._idlefuns.items()) | ||
432 | |||
433 | for function, data in items: | ||
308 | try: | 434 | try: |
309 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | 435 | retval = function(self, data, False) |
310 | except subprocess.CalledProcessError: | 436 | if isinstance(retval, idleFinish): |
311 | # File was deleted? | 437 | serverlog("Removing idle function %s at idleFinish" % str(function)) |
312 | continue | 438 | remove_idle_func(function) |
313 | except OSError as e: | 439 | self.cooker.command.finishAsyncCommand(retval.msg) |
314 | if e.errno != errno.ENOENT: | 440 | nextsleep = None |
315 | raise | 441 | elif retval is False: |
316 | if procs is None: | 442 | serverlog("Removing idle function %s" % str(function)) |
317 | # Fall back to fuser if lsof is unavailable | 443 | remove_idle_func(function) |
318 | try: | 444 | nextsleep = None |
319 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | 445 | elif retval is True: |
320 | except subprocess.CalledProcessError: | 446 | nextsleep = None |
321 | # File was deleted? | 447 | elif isinstance(retval, float) and nextsleep: |
448 | if (retval < nextsleep): | ||
449 | nextsleep = retval | ||
450 | elif nextsleep is None: | ||
322 | continue | 451 | continue |
323 | except OSError as e: | 452 | else: |
324 | if e.errno != errno.ENOENT: | 453 | fds = fds + retval |
325 | raise | 454 | except SystemExit: |
455 | raise | ||
456 | except Exception as exc: | ||
457 | if not isinstance(exc, bb.BBHandledException): | ||
458 | logger.exception('Running idle function') | ||
459 | remove_idle_func(function) | ||
460 | serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) | ||
461 | self.quit = True | ||
326 | 462 | ||
327 | msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" | 463 | # Create new heartbeat event? |
328 | if procs: | 464 | now = time.time() |
329 | msg += ":\n%s" % str(procs.decode("utf-8")) | 465 | if bb.event._heartbeat_enabled and now >= self.next_heartbeat: |
330 | serverlog(msg) | 466 | # We might have missed heartbeats. Just trigger once in |
467 | # that case and continue after the usual delay. | ||
468 | self.next_heartbeat += self.heartbeat_seconds | ||
469 | if self.next_heartbeat <= now: | ||
470 | self.next_heartbeat = now + self.heartbeat_seconds | ||
471 | if hasattr(self.cooker, "data"): | ||
472 | heartbeat = bb.event.HeartbeatEvent(now) | ||
473 | try: | ||
474 | bb.event.fire(heartbeat, self.cooker.data) | ||
475 | except Exception as exc: | ||
476 | if not isinstance(exc, bb.BBHandledException): | ||
477 | logger.exception('Running heartbeat function') | ||
478 | serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) | ||
479 | self.quit = True | ||
480 | if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: | ||
481 | # Shorten timeout so that we we wake up in time for | ||
482 | # the heartbeat. | ||
483 | nextsleep = self.next_heartbeat - now | ||
484 | |||
485 | if nextsleep is not None: | ||
486 | select.select(fds,[],[],nextsleep)[0] | ||
331 | 487 | ||
332 | def idle_commands(self, delay, fds=None): | 488 | def idle_commands(self, delay, fds=None): |
333 | nextsleep = delay | 489 | nextsleep = delay |
334 | if not fds: | 490 | if not fds: |
335 | fds = [] | 491 | fds = [] |
336 | 492 | ||
337 | for function, data in list(self._idlefuns.items()): | 493 | if not self.idle: |
338 | try: | 494 | self.idle = threading.Thread(target=self.idle_thread) |
339 | retval = function(self, data, False) | 495 | self.idle.start() |
340 | if retval is False: | 496 | elif self.idle and not self.idle.is_alive(): |
341 | del self._idlefuns[function] | 497 | serverlog("Idle thread terminated, main thread exiting too") |
342 | nextsleep = None | 498 | bb.error("Idle thread terminated, main thread exiting too") |
343 | elif retval is True: | 499 | self.quit = True |
344 | nextsleep = None | ||
345 | elif isinstance(retval, float) and nextsleep: | ||
346 | if (retval < nextsleep): | ||
347 | nextsleep = retval | ||
348 | elif nextsleep is None: | ||
349 | continue | ||
350 | else: | ||
351 | fds = fds + retval | ||
352 | except SystemExit: | ||
353 | raise | ||
354 | except Exception as exc: | ||
355 | if not isinstance(exc, bb.BBHandledException): | ||
356 | logger.exception('Running idle function') | ||
357 | del self._idlefuns[function] | ||
358 | self.quit = True | ||
359 | |||
360 | # Create new heartbeat event? | ||
361 | now = time.time() | ||
362 | if now >= self.next_heartbeat: | ||
363 | # We might have missed heartbeats. Just trigger once in | ||
364 | # that case and continue after the usual delay. | ||
365 | self.next_heartbeat += self.heartbeat_seconds | ||
366 | if self.next_heartbeat <= now: | ||
367 | self.next_heartbeat = now + self.heartbeat_seconds | ||
368 | if hasattr(self.cooker, "data"): | ||
369 | heartbeat = bb.event.HeartbeatEvent(now) | ||
370 | bb.event.fire(heartbeat, self.cooker.data) | ||
371 | if nextsleep and now + nextsleep > self.next_heartbeat: | ||
372 | # Shorten timeout so that we we wake up in time for | ||
373 | # the heartbeat. | ||
374 | nextsleep = self.next_heartbeat - now | ||
375 | 500 | ||
376 | if nextsleep is not None: | 501 | if nextsleep is not None: |
377 | if self.xmlrpc: | 502 | if self.xmlrpc: |
@@ -391,12 +516,18 @@ class ServerCommunicator(): | |||
391 | self.recv = recv | 516 | self.recv = recv |
392 | 517 | ||
393 | def runCommand(self, command): | 518 | def runCommand(self, command): |
394 | self.connection.send(command) | 519 | try: |
520 | self.connection.send(command) | ||
521 | except BrokenPipeError as e: | ||
522 | raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
395 | if not self.recv.poll(30): | 523 | if not self.recv.poll(30): |
396 | logger.info("No reply from server in 30s") | 524 | logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) |
397 | if not self.recv.poll(30): | 525 | if not self.recv.poll(30): |
398 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") | 526 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) |
399 | ret, exc = self.recv.get() | 527 | try: |
528 | ret, exc = self.recv.get() | ||
529 | except EOFError as e: | ||
530 | raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
400 | # Should probably turn all exceptions in exc back into exceptions? | 531 | # Should probably turn all exceptions in exc back into exceptions? |
401 | # For now, at least handle BBHandledException | 532 | # For now, at least handle BBHandledException |
402 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): | 533 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): |
@@ -429,6 +560,7 @@ class BitBakeProcessServerConnection(object): | |||
429 | self.socket_connection = sock | 560 | self.socket_connection = sock |
430 | 561 | ||
431 | def terminate(self): | 562 | def terminate(self): |
563 | self.events.close() | ||
432 | self.socket_connection.close() | 564 | self.socket_connection.close() |
433 | self.connection.connection.close() | 565 | self.connection.connection.close() |
434 | self.connection.recv.close() | 566 | self.connection.recv.close() |
@@ -439,13 +571,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' | |||
439 | 571 | ||
440 | class BitBakeServer(object): | 572 | class BitBakeServer(object): |
441 | 573 | ||
442 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): | 574 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): |
443 | 575 | ||
444 | self.server_timeout = server_timeout | 576 | self.server_timeout = server_timeout |
445 | self.xmlrpcinterface = xmlrpcinterface | 577 | self.xmlrpcinterface = xmlrpcinterface |
446 | self.featureset = featureset | 578 | self.featureset = featureset |
447 | self.sockname = sockname | 579 | self.sockname = sockname |
448 | self.bitbake_lock = lock | 580 | self.bitbake_lock = lock |
581 | self.profile = profile | ||
449 | self.readypipe, self.readypipein = os.pipe() | 582 | self.readypipe, self.readypipein = os.pipe() |
450 | 583 | ||
451 | # Place the log in the builddirectory alongside the lock file | 584 | # Place the log in the builddirectory alongside the lock file |
@@ -466,7 +599,7 @@ class BitBakeServer(object): | |||
466 | try: | 599 | try: |
467 | r = ready.get() | 600 | r = ready.get() |
468 | except EOFError: | 601 | except EOFError: |
469 | # Trap the child exitting/closing the pipe and error out | 602 | # Trap the child exiting/closing the pipe and error out |
470 | r = None | 603 | r = None |
471 | if not r or r[0] != "r": | 604 | if not r or r[0] != "r": |
472 | ready.close() | 605 | ready.close() |
@@ -509,9 +642,9 @@ class BitBakeServer(object): | |||
509 | os.set_inheritable(self.bitbake_lock.fileno(), True) | 642 | os.set_inheritable(self.bitbake_lock.fileno(), True) |
510 | os.set_inheritable(self.readypipein, True) | 643 | os.set_inheritable(self.readypipein, True) |
511 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") | 644 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") |
512 | os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) | 645 | os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) |
513 | 646 | ||
514 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): | 647 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): |
515 | 648 | ||
516 | import bb.cookerdata | 649 | import bb.cookerdata |
517 | import bb.cooker | 650 | import bb.cooker |
@@ -523,6 +656,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
523 | 656 | ||
524 | # Create server control socket | 657 | # Create server control socket |
525 | if os.path.exists(sockname): | 658 | if os.path.exists(sockname): |
659 | serverlog("WARNING: removing existing socket file '%s'" % sockname) | ||
526 | os.unlink(sockname) | 660 | os.unlink(sockname) |
527 | 661 | ||
528 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 662 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
@@ -539,7 +673,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
539 | writer = ConnectionWriter(readypipeinfd) | 673 | writer = ConnectionWriter(readypipeinfd) |
540 | try: | 674 | try: |
541 | featureset = [] | 675 | featureset = [] |
542 | cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) | 676 | cooker = bb.cooker.BBCooker(featureset, server) |
677 | cooker.configuration.profile = profile | ||
543 | except bb.BBHandledException: | 678 | except bb.BBHandledException: |
544 | return None | 679 | return None |
545 | writer.send("r") | 680 | writer.send("r") |
@@ -549,7 +684,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
549 | 684 | ||
550 | server.run() | 685 | server.run() |
551 | finally: | 686 | finally: |
552 | # Flush any ,essages/errors to the logfile before exit | 687 | # Flush any messages/errors to the logfile before exit |
553 | sys.stdout.flush() | 688 | sys.stdout.flush() |
554 | sys.stderr.flush() | 689 | sys.stderr.flush() |
555 | 690 | ||
@@ -654,23 +789,18 @@ class BBUIEventQueue: | |||
654 | self.reader = ConnectionReader(readfd) | 789 | self.reader = ConnectionReader(readfd) |
655 | 790 | ||
656 | self.t = threading.Thread() | 791 | self.t = threading.Thread() |
657 | self.t.setDaemon(True) | ||
658 | self.t.run = self.startCallbackHandler | 792 | self.t.run = self.startCallbackHandler |
659 | self.t.start() | 793 | self.t.start() |
660 | 794 | ||
661 | def getEvent(self): | 795 | def getEvent(self): |
662 | self.eventQueueLock.acquire() | 796 | with bb.utils.lock_timeout(self.eventQueueLock): |
663 | 797 | if len(self.eventQueue) == 0: | |
664 | if len(self.eventQueue) == 0: | 798 | return None |
665 | self.eventQueueLock.release() | ||
666 | return None | ||
667 | |||
668 | item = self.eventQueue.pop(0) | ||
669 | 799 | ||
670 | if len(self.eventQueue) == 0: | 800 | item = self.eventQueue.pop(0) |
671 | self.eventQueueNotify.clear() | 801 | if len(self.eventQueue) == 0: |
802 | self.eventQueueNotify.clear() | ||
672 | 803 | ||
673 | self.eventQueueLock.release() | ||
674 | return item | 804 | return item |
675 | 805 | ||
676 | def waitEvent(self, delay): | 806 | def waitEvent(self, delay): |
@@ -678,10 +808,9 @@ class BBUIEventQueue: | |||
678 | return self.getEvent() | 808 | return self.getEvent() |
679 | 809 | ||
680 | def queue_event(self, event): | 810 | def queue_event(self, event): |
681 | self.eventQueueLock.acquire() | 811 | with bb.utils.lock_timeout(self.eventQueueLock): |
682 | self.eventQueue.append(event) | 812 | self.eventQueue.append(event) |
683 | self.eventQueueNotify.set() | 813 | self.eventQueueNotify.set() |
684 | self.eventQueueLock.release() | ||
685 | 814 | ||
686 | def send_event(self, event): | 815 | def send_event(self, event): |
687 | self.queue_event(pickle.loads(event)) | 816 | self.queue_event(pickle.loads(event)) |
@@ -690,13 +819,17 @@ class BBUIEventQueue: | |||
690 | bb.utils.set_process_name("UIEventQueue") | 819 | bb.utils.set_process_name("UIEventQueue") |
691 | while True: | 820 | while True: |
692 | try: | 821 | try: |
693 | self.reader.wait() | 822 | ready = self.reader.wait(0.25) |
694 | event = self.reader.get() | 823 | if ready: |
695 | self.queue_event(event) | 824 | event = self.reader.get() |
696 | except EOFError: | 825 | self.queue_event(event) |
826 | except (EOFError, OSError, TypeError): | ||
697 | # Easiest way to exit is to close the file descriptor to cause an exit | 827 | # Easiest way to exit is to close the file descriptor to cause an exit |
698 | break | 828 | break |
829 | |||
830 | def close(self): | ||
699 | self.reader.close() | 831 | self.reader.close() |
832 | self.t.join() | ||
700 | 833 | ||
701 | class ConnectionReader(object): | 834 | class ConnectionReader(object): |
702 | 835 | ||
@@ -711,7 +844,7 @@ class ConnectionReader(object): | |||
711 | return self.reader.poll(timeout) | 844 | return self.reader.poll(timeout) |
712 | 845 | ||
713 | def get(self): | 846 | def get(self): |
714 | with self.rlock: | 847 | with bb.utils.lock_timeout(self.rlock): |
715 | res = self.reader.recv_bytes() | 848 | res = self.reader.recv_bytes() |
716 | return multiprocessing.reduction.ForkingPickler.loads(res) | 849 | return multiprocessing.reduction.ForkingPickler.loads(res) |
717 | 850 | ||
@@ -730,10 +863,31 @@ class ConnectionWriter(object): | |||
730 | # Why bb.event needs this I have no idea | 863 | # Why bb.event needs this I have no idea |
731 | self.event = self | 864 | self.event = self |
732 | 865 | ||
866 | def _send(self, obj): | ||
867 | gc.disable() | ||
868 | with bb.utils.lock_timeout(self.wlock): | ||
869 | self.writer.send_bytes(obj) | ||
870 | gc.enable() | ||
871 | |||
733 | def send(self, obj): | 872 | def send(self, obj): |
734 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) | 873 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) |
735 | with self.wlock: | 874 | # See notes/code in CookerParser |
736 | self.writer.send_bytes(obj) | 875 | # We must not terminate holding this lock else processes will hang. |
876 | # For SIGTERM, raising afterwards avoids this. | ||
877 | # For SIGINT, we don't want to have written partial data to the pipe. | ||
878 | # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 | ||
879 | process = multiprocessing.current_process() | ||
880 | if process and hasattr(process, "queue_signals"): | ||
881 | with bb.utils.lock_timeout(process.signal_threadlock): | ||
882 | process.queue_signals = True | ||
883 | self._send(obj) | ||
884 | process.queue_signals = False | ||
885 | |||
886 | while len(process.signal_received) > 0: | ||
887 | sig = process.signal_received.pop() | ||
888 | process.handle_sig(sig, None) | ||
889 | else: | ||
890 | self._send(obj) | ||
737 | 891 | ||
738 | def fileno(self): | 892 | def fileno(self): |
739 | return self.writer.fileno() | 893 | return self.writer.fileno() |