diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 402 |
1 files changed, 273 insertions, 129 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index b27b4aefe0..4b35be62cd 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
@@ -26,6 +26,9 @@ import errno | |||
26 | import re | 26 | import re |
27 | import datetime | 27 | import datetime |
28 | import pickle | 28 | import pickle |
29 | import traceback | ||
30 | import gc | ||
31 | import stat | ||
29 | import bb.server.xmlrpcserver | 32 | import bb.server.xmlrpcserver |
30 | from bb import daemonize | 33 | from bb import daemonize |
31 | from multiprocessing import queues | 34 | from multiprocessing import queues |
@@ -35,9 +38,46 @@ logger = logging.getLogger('BitBake') | |||
35 | class ProcessTimeout(SystemExit): | 38 | class ProcessTimeout(SystemExit): |
36 | pass | 39 | pass |
37 | 40 | ||
41 | def currenttime(): | ||
42 | return datetime.datetime.now().strftime('%H:%M:%S.%f') | ||
43 | |||
38 | def serverlog(msg): | 44 | def serverlog(msg): |
39 | print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) | 45 | print(str(os.getpid()) + " " + currenttime() + " " + msg) |
40 | sys.stdout.flush() | 46 | #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server |
47 | #sys.stdout.flush() | ||
48 | |||
49 | # | ||
50 | # When we have lockfile issues, try and find infomation about which process is | ||
51 | # using the lockfile | ||
52 | # | ||
53 | def get_lockfile_process_msg(lockfile): | ||
54 | # Some systems may not have lsof available | ||
55 | procs = None | ||
56 | try: | ||
57 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
58 | except subprocess.CalledProcessError: | ||
59 | # File was deleted? | ||
60 | pass | ||
61 | except OSError as e: | ||
62 | if e.errno != errno.ENOENT: | ||
63 | raise | ||
64 | if procs is None: | ||
65 | # Fall back to fuser if lsof is unavailable | ||
66 | try: | ||
67 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
68 | except subprocess.CalledProcessError: | ||
69 | # File was deleted? | ||
70 | pass | ||
71 | except OSError as e: | ||
72 | if e.errno != errno.ENOENT: | ||
73 | raise | ||
74 | if procs: | ||
75 | return procs.decode("utf-8") | ||
76 | return None | ||
77 | |||
78 | class idleFinish(): | ||
79 | def __init__(self, msg): | ||
80 | self.msg = msg | ||
41 | 81 | ||
42 | class ProcessServer(): | 82 | class ProcessServer(): |
43 | profile_filename = "profile.log" | 83 | profile_filename = "profile.log" |
@@ -56,12 +96,19 @@ class ProcessServer(): | |||
56 | self.maxuiwait = 30 | 96 | self.maxuiwait = 30 |
57 | self.xmlrpc = False | 97 | self.xmlrpc = False |
58 | 98 | ||
99 | self.idle = None | ||
100 | # Need a lock for _idlefuns changes | ||
59 | self._idlefuns = {} | 101 | self._idlefuns = {} |
102 | self._idlefuncsLock = threading.Lock() | ||
103 | self.idle_cond = threading.Condition(self._idlefuncsLock) | ||
60 | 104 | ||
61 | self.bitbake_lock = lock | 105 | self.bitbake_lock = lock |
62 | self.bitbake_lock_name = lockname | 106 | self.bitbake_lock_name = lockname |
63 | self.sock = sock | 107 | self.sock = sock |
64 | self.sockname = sockname | 108 | self.sockname = sockname |
109 | # It is possible the directory may be renamed. Cache the inode of the socket file | ||
110 | # so we can tell if things changed. | ||
111 | self.sockinode = os.stat(self.sockname)[stat.ST_INO] | ||
65 | 112 | ||
66 | self.server_timeout = server_timeout | 113 | self.server_timeout = server_timeout |
67 | self.timeout = self.server_timeout | 114 | self.timeout = self.server_timeout |
@@ -70,7 +117,9 @@ class ProcessServer(): | |||
70 | def register_idle_function(self, function, data): | 117 | def register_idle_function(self, function, data): |
71 | """Register a function to be called while the server is idle""" | 118 | """Register a function to be called while the server is idle""" |
72 | assert hasattr(function, '__call__') | 119 | assert hasattr(function, '__call__') |
73 | self._idlefuns[function] = data | 120 | with bb.utils.lock_timeout(self._idlefuncsLock): |
121 | self._idlefuns[function] = data | ||
122 | serverlog("Registering idle function %s" % str(function)) | ||
74 | 123 | ||
75 | def run(self): | 124 | def run(self): |
76 | 125 | ||
@@ -109,6 +158,31 @@ class ProcessServer(): | |||
109 | 158 | ||
110 | return ret | 159 | return ret |
111 | 160 | ||
161 | def _idle_check(self): | ||
162 | return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None | ||
163 | |||
164 | def wait_for_idle(self, timeout=30): | ||
165 | # Wait for the idle loop to have cleared | ||
166 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
167 | return self.idle_cond.wait_for(self._idle_check, timeout) is not False | ||
168 | |||
169 | def set_async_cmd(self, cmd): | ||
170 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
171 | ret = self.idle_cond.wait_for(self._idle_check, 30) | ||
172 | if ret is False: | ||
173 | return False | ||
174 | self.cooker.command.currentAsyncCommand = cmd | ||
175 | return True | ||
176 | |||
177 | def clear_async_cmd(self): | ||
178 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
179 | self.cooker.command.currentAsyncCommand = None | ||
180 | self.idle_cond.notify_all() | ||
181 | |||
182 | def get_async_cmd(self): | ||
183 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
184 | return self.cooker.command.currentAsyncCommand | ||
185 | |||
112 | def main(self): | 186 | def main(self): |
113 | self.cooker.pre_serve() | 187 | self.cooker.pre_serve() |
114 | 188 | ||
@@ -123,14 +197,19 @@ class ProcessServer(): | |||
123 | fds.append(self.xmlrpc) | 197 | fds.append(self.xmlrpc) |
124 | seendata = False | 198 | seendata = False |
125 | serverlog("Entering server connection loop") | 199 | serverlog("Entering server connection loop") |
200 | serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) | ||
126 | 201 | ||
127 | def disconnect_client(self, fds): | 202 | def disconnect_client(self, fds): |
128 | serverlog("Disconnecting Client") | 203 | serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) |
129 | if self.controllersock: | 204 | if self.controllersock: |
130 | fds.remove(self.controllersock) | 205 | fds.remove(self.controllersock) |
131 | self.controllersock.close() | 206 | self.controllersock.close() |
132 | self.controllersock = False | 207 | self.controllersock = False |
133 | if self.haveui: | 208 | if self.haveui: |
209 | # Wait for the idle loop to have cleared (30s max) | ||
210 | if not self.wait_for_idle(30): | ||
211 | serverlog("Idle loop didn't finish queued commands after 30s, exiting.") | ||
212 | self.quit = True | ||
134 | fds.remove(self.command_channel) | 213 | fds.remove(self.command_channel) |
135 | bb.event.unregister_UIHhandler(self.event_handle, True) | 214 | bb.event.unregister_UIHhandler(self.event_handle, True) |
136 | self.command_channel_reply.writer.close() | 215 | self.command_channel_reply.writer.close() |
@@ -142,12 +221,12 @@ class ProcessServer(): | |||
142 | self.cooker.clientComplete() | 221 | self.cooker.clientComplete() |
143 | self.haveui = False | 222 | self.haveui = False |
144 | ready = select.select(fds,[],[],0)[0] | 223 | ready = select.select(fds,[],[],0)[0] |
145 | if newconnections: | 224 | if newconnections and not self.quit: |
146 | serverlog("Starting new client") | 225 | serverlog("Starting new client") |
147 | conn = newconnections.pop(-1) | 226 | conn = newconnections.pop(-1) |
148 | fds.append(conn) | 227 | fds.append(conn) |
149 | self.controllersock = conn | 228 | self.controllersock = conn |
150 | elif self.timeout is None and not ready: | 229 | elif not self.timeout and not ready: |
151 | serverlog("No timeout, exiting.") | 230 | serverlog("No timeout, exiting.") |
152 | self.quit = True | 231 | self.quit = True |
153 | 232 | ||
@@ -214,11 +293,14 @@ class ProcessServer(): | |||
214 | continue | 293 | continue |
215 | try: | 294 | try: |
216 | serverlog("Running command %s" % command) | 295 | serverlog("Running command %s" % command) |
217 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) | 296 | reply = self.cooker.command.runCommand(command, self) |
218 | serverlog("Command Completed") | 297 | serverlog("Sending reply %s" % repr(reply)) |
298 | self.command_channel_reply.send(reply) | ||
299 | serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) | ||
219 | except Exception as e: | 300 | except Exception as e: |
220 | serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 301 | stack = traceback.format_exc() |
221 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 302 | serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) |
303 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) | ||
222 | 304 | ||
223 | if self.xmlrpc in ready: | 305 | if self.xmlrpc in ready: |
224 | self.xmlrpc.handle_requests() | 306 | self.xmlrpc.handle_requests() |
@@ -239,21 +321,42 @@ class ProcessServer(): | |||
239 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) | 321 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) |
240 | seendata = True | 322 | seendata = True |
241 | 323 | ||
242 | ready = self.idle_commands(.1, fds) | 324 | if not self.idle: |
325 | self.idle = threading.Thread(target=self.idle_thread) | ||
326 | self.idle.start() | ||
327 | elif self.idle and not self.idle.is_alive(): | ||
328 | serverlog("Idle thread terminated, main thread exiting too") | ||
329 | bb.error("Idle thread terminated, main thread exiting too") | ||
330 | self.quit = True | ||
243 | 331 | ||
244 | if len(threading.enumerate()) != 1: | 332 | nextsleep = 1.0 |
245 | serverlog("More than one thread left?: " + str(threading.enumerate())) | 333 | if self.xmlrpc: |
334 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
335 | try: | ||
336 | ready = select.select(fds,[],[],nextsleep)[0] | ||
337 | except InterruptedError: | ||
338 | # Ignore EINTR | ||
339 | ready = [] | ||
340 | |||
341 | if self.idle: | ||
342 | self.idle.join() | ||
246 | 343 | ||
247 | serverlog("Exiting") | 344 | serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) |
248 | # Remove the socket file so we don't get any more connections to avoid races | 345 | # Remove the socket file so we don't get any more connections to avoid races |
346 | # The build directory could have been renamed so if the file isn't the one we created | ||
347 | # we shouldn't delete it. | ||
249 | try: | 348 | try: |
250 | os.unlink(self.sockname) | 349 | sockinode = os.stat(self.sockname)[stat.ST_INO] |
251 | except: | 350 | if sockinode == self.sockinode: |
252 | pass | 351 | os.unlink(self.sockname) |
352 | else: | ||
353 | serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) | ||
354 | except Exception as err: | ||
355 | serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) | ||
253 | self.sock.close() | 356 | self.sock.close() |
254 | 357 | ||
255 | try: | 358 | try: |
256 | self.cooker.shutdown(True) | 359 | self.cooker.shutdown(True, idle=False) |
257 | self.cooker.notifier.stop() | 360 | self.cooker.notifier.stop() |
258 | self.cooker.confignotifier.stop() | 361 | self.cooker.confignotifier.stop() |
259 | except: | 362 | except: |
@@ -261,6 +364,9 @@ class ProcessServer(): | |||
261 | 364 | ||
262 | self.cooker.post_serve() | 365 | self.cooker.post_serve() |
263 | 366 | ||
367 | if len(threading.enumerate()) != 1: | ||
368 | serverlog("More than one thread left?: " + str(threading.enumerate())) | ||
369 | |||
264 | # Flush logs before we release the lock | 370 | # Flush logs before we release the lock |
265 | sys.stdout.flush() | 371 | sys.stdout.flush() |
266 | sys.stderr.flush() | 372 | sys.stderr.flush() |
@@ -276,20 +382,21 @@ class ProcessServer(): | |||
276 | except FileNotFoundError: | 382 | except FileNotFoundError: |
277 | return None | 383 | return None |
278 | 384 | ||
279 | lockcontents = get_lock_contents(lockfile) | ||
280 | serverlog("Original lockfile contents: " + str(lockcontents)) | ||
281 | |||
282 | lock.close() | 385 | lock.close() |
283 | lock = None | 386 | lock = None |
284 | 387 | ||
285 | while not lock: | 388 | while not lock: |
286 | i = 0 | 389 | i = 0 |
287 | lock = None | 390 | lock = None |
391 | if not os.path.exists(os.path.basename(lockfile)): | ||
392 | serverlog("Lockfile directory gone, exiting.") | ||
393 | return | ||
394 | |||
288 | while not lock and i < 30: | 395 | while not lock and i < 30: |
289 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) | 396 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) |
290 | if not lock: | 397 | if not lock: |
291 | newlockcontents = get_lock_contents(lockfile) | 398 | newlockcontents = get_lock_contents(lockfile) |
292 | if newlockcontents != lockcontents: | 399 | if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): |
293 | # A new server was started, the lockfile contents changed, we can exit | 400 | # A new server was started, the lockfile contents changed, we can exit |
294 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) | 401 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) |
295 | return | 402 | return |
@@ -303,87 +410,95 @@ class ProcessServer(): | |||
303 | return | 410 | return |
304 | 411 | ||
305 | if not lock: | 412 | if not lock: |
306 | # Some systems may not have lsof available | 413 | procs = get_lockfile_process_msg(lockfile) |
307 | procs = None | 414 | msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] |
308 | try: | ||
309 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
310 | except subprocess.CalledProcessError: | ||
311 | # File was deleted? | ||
312 | continue | ||
313 | except OSError as e: | ||
314 | if e.errno != errno.ENOENT: | ||
315 | raise | ||
316 | if procs is None: | ||
317 | # Fall back to fuser if lsof is unavailable | ||
318 | try: | ||
319 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
320 | except subprocess.CalledProcessError: | ||
321 | # File was deleted? | ||
322 | continue | ||
323 | except OSError as e: | ||
324 | if e.errno != errno.ENOENT: | ||
325 | raise | ||
326 | |||
327 | msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" | ||
328 | if procs: | 415 | if procs: |
329 | msg += ":\n%s" % str(procs.decode("utf-8")) | 416 | msg.append(":\n%s" % procs) |
330 | serverlog(msg) | 417 | serverlog("".join(msg)) |
331 | 418 | ||
332 | def idle_commands(self, delay, fds=None): | 419 | def idle_thread(self): |
333 | nextsleep = delay | 420 | if self.cooker.configuration.profile: |
334 | if not fds: | ||
335 | fds = [] | ||
336 | |||
337 | for function, data in list(self._idlefuns.items()): | ||
338 | try: | 421 | try: |
339 | retval = function(self, data, False) | 422 | import cProfile as profile |
340 | if retval is False: | 423 | except: |
341 | del self._idlefuns[function] | 424 | import profile |
342 | nextsleep = None | 425 | prof = profile.Profile() |
343 | elif retval is True: | ||
344 | nextsleep = None | ||
345 | elif isinstance(retval, float) and nextsleep: | ||
346 | if (retval < nextsleep): | ||
347 | nextsleep = retval | ||
348 | elif nextsleep is None: | ||
349 | continue | ||
350 | else: | ||
351 | fds = fds + retval | ||
352 | except SystemExit: | ||
353 | raise | ||
354 | except Exception as exc: | ||
355 | if not isinstance(exc, bb.BBHandledException): | ||
356 | logger.exception('Running idle function') | ||
357 | del self._idlefuns[function] | ||
358 | self.quit = True | ||
359 | 426 | ||
360 | # Create new heartbeat event? | 427 | ret = profile.Profile.runcall(prof, self.idle_thread_internal) |
361 | now = time.time() | 428 | |
362 | if now >= self.next_heartbeat: | 429 | prof.dump_stats("profile-mainloop.log") |
363 | # We might have missed heartbeats. Just trigger once in | 430 | bb.utils.process_profilelog("profile-mainloop.log") |
364 | # that case and continue after the usual delay. | 431 | serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") |
365 | self.next_heartbeat += self.heartbeat_seconds | ||
366 | if self.next_heartbeat <= now: | ||
367 | self.next_heartbeat = now + self.heartbeat_seconds | ||
368 | if hasattr(self.cooker, "data"): | ||
369 | heartbeat = bb.event.HeartbeatEvent(now) | ||
370 | bb.event.fire(heartbeat, self.cooker.data) | ||
371 | if nextsleep and now + nextsleep > self.next_heartbeat: | ||
372 | # Shorten timeout so that we we wake up in time for | ||
373 | # the heartbeat. | ||
374 | nextsleep = self.next_heartbeat - now | ||
375 | |||
376 | if nextsleep is not None: | ||
377 | if self.xmlrpc: | ||
378 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
379 | try: | ||
380 | return select.select(fds,[],[],nextsleep)[0] | ||
381 | except InterruptedError: | ||
382 | # Ignore EINTR | ||
383 | return [] | ||
384 | else: | 432 | else: |
385 | return select.select(fds,[],[],0)[0] | 433 | self.idle_thread_internal() |
434 | |||
435 | def idle_thread_internal(self): | ||
436 | def remove_idle_func(function): | ||
437 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
438 | del self._idlefuns[function] | ||
439 | self.idle_cond.notify_all() | ||
440 | |||
441 | while not self.quit: | ||
442 | nextsleep = 1.0 | ||
443 | fds = [] | ||
444 | |||
445 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
446 | items = list(self._idlefuns.items()) | ||
386 | 447 | ||
448 | for function, data in items: | ||
449 | try: | ||
450 | retval = function(self, data, False) | ||
451 | if isinstance(retval, idleFinish): | ||
452 | serverlog("Removing idle function %s at idleFinish" % str(function)) | ||
453 | remove_idle_func(function) | ||
454 | self.cooker.command.finishAsyncCommand(retval.msg) | ||
455 | nextsleep = None | ||
456 | elif retval is False: | ||
457 | serverlog("Removing idle function %s" % str(function)) | ||
458 | remove_idle_func(function) | ||
459 | nextsleep = None | ||
460 | elif retval is True: | ||
461 | nextsleep = None | ||
462 | elif isinstance(retval, float) and nextsleep: | ||
463 | if (retval < nextsleep): | ||
464 | nextsleep = retval | ||
465 | elif nextsleep is None: | ||
466 | continue | ||
467 | else: | ||
468 | fds = fds + retval | ||
469 | except SystemExit: | ||
470 | raise | ||
471 | except Exception as exc: | ||
472 | if not isinstance(exc, bb.BBHandledException): | ||
473 | logger.exception('Running idle function') | ||
474 | remove_idle_func(function) | ||
475 | serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) | ||
476 | self.quit = True | ||
477 | |||
478 | # Create new heartbeat event? | ||
479 | now = time.time() | ||
480 | if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat: | ||
481 | # We might have missed heartbeats. Just trigger once in | ||
482 | # that case and continue after the usual delay. | ||
483 | self.next_heartbeat += self.heartbeat_seconds | ||
484 | if self.next_heartbeat <= now: | ||
485 | self.next_heartbeat = now + self.heartbeat_seconds | ||
486 | if hasattr(self.cooker, "data"): | ||
487 | heartbeat = bb.event.HeartbeatEvent(now) | ||
488 | try: | ||
489 | bb.event.fire(heartbeat, self.cooker.data) | ||
490 | except Exception as exc: | ||
491 | if not isinstance(exc, bb.BBHandledException): | ||
492 | logger.exception('Running heartbeat function') | ||
493 | serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) | ||
494 | self.quit = True | ||
495 | if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: | ||
496 | # Shorten timeout so that we we wake up in time for | ||
497 | # the heartbeat. | ||
498 | nextsleep = self.next_heartbeat - now | ||
499 | |||
500 | if nextsleep is not None: | ||
501 | select.select(fds,[],[],nextsleep)[0] | ||
387 | 502 | ||
388 | class ServerCommunicator(): | 503 | class ServerCommunicator(): |
389 | def __init__(self, connection, recv): | 504 | def __init__(self, connection, recv): |
@@ -391,12 +506,18 @@ class ServerCommunicator(): | |||
391 | self.recv = recv | 506 | self.recv = recv |
392 | 507 | ||
393 | def runCommand(self, command): | 508 | def runCommand(self, command): |
394 | self.connection.send(command) | 509 | try: |
510 | self.connection.send(command) | ||
511 | except BrokenPipeError as e: | ||
512 | raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
395 | if not self.recv.poll(30): | 513 | if not self.recv.poll(30): |
396 | logger.info("No reply from server in 30s") | 514 | logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) |
397 | if not self.recv.poll(30): | 515 | if not self.recv.poll(30): |
398 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") | 516 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) |
399 | ret, exc = self.recv.get() | 517 | try: |
518 | ret, exc = self.recv.get() | ||
519 | except EOFError as e: | ||
520 | raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
400 | # Should probably turn all exceptions in exc back into exceptions? | 521 | # Should probably turn all exceptions in exc back into exceptions? |
401 | # For now, at least handle BBHandledException | 522 | # For now, at least handle BBHandledException |
402 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): | 523 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): |
@@ -429,6 +550,7 @@ class BitBakeProcessServerConnection(object): | |||
429 | self.socket_connection = sock | 550 | self.socket_connection = sock |
430 | 551 | ||
431 | def terminate(self): | 552 | def terminate(self): |
553 | self.events.close() | ||
432 | self.socket_connection.close() | 554 | self.socket_connection.close() |
433 | self.connection.connection.close() | 555 | self.connection.connection.close() |
434 | self.connection.recv.close() | 556 | self.connection.recv.close() |
@@ -439,13 +561,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' | |||
439 | 561 | ||
440 | class BitBakeServer(object): | 562 | class BitBakeServer(object): |
441 | 563 | ||
442 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): | 564 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): |
443 | 565 | ||
444 | self.server_timeout = server_timeout | 566 | self.server_timeout = server_timeout |
445 | self.xmlrpcinterface = xmlrpcinterface | 567 | self.xmlrpcinterface = xmlrpcinterface |
446 | self.featureset = featureset | 568 | self.featureset = featureset |
447 | self.sockname = sockname | 569 | self.sockname = sockname |
448 | self.bitbake_lock = lock | 570 | self.bitbake_lock = lock |
571 | self.profile = profile | ||
449 | self.readypipe, self.readypipein = os.pipe() | 572 | self.readypipe, self.readypipein = os.pipe() |
450 | 573 | ||
451 | # Place the log in the builddirectory alongside the lock file | 574 | # Place the log in the builddirectory alongside the lock file |
@@ -466,7 +589,7 @@ class BitBakeServer(object): | |||
466 | try: | 589 | try: |
467 | r = ready.get() | 590 | r = ready.get() |
468 | except EOFError: | 591 | except EOFError: |
469 | # Trap the child exitting/closing the pipe and error out | 592 | # Trap the child exiting/closing the pipe and error out |
470 | r = None | 593 | r = None |
471 | if not r or r[0] != "r": | 594 | if not r or r[0] != "r": |
472 | ready.close() | 595 | ready.close() |
@@ -509,9 +632,9 @@ class BitBakeServer(object): | |||
509 | os.set_inheritable(self.bitbake_lock.fileno(), True) | 632 | os.set_inheritable(self.bitbake_lock.fileno(), True) |
510 | os.set_inheritable(self.readypipein, True) | 633 | os.set_inheritable(self.readypipein, True) |
511 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") | 634 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") |
512 | os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) | 635 | os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) |
513 | 636 | ||
514 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): | 637 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): |
515 | 638 | ||
516 | import bb.cookerdata | 639 | import bb.cookerdata |
517 | import bb.cooker | 640 | import bb.cooker |
@@ -523,6 +646,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
523 | 646 | ||
524 | # Create server control socket | 647 | # Create server control socket |
525 | if os.path.exists(sockname): | 648 | if os.path.exists(sockname): |
649 | serverlog("WARNING: removing existing socket file '%s'" % sockname) | ||
526 | os.unlink(sockname) | 650 | os.unlink(sockname) |
527 | 651 | ||
528 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 652 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
@@ -539,7 +663,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
539 | writer = ConnectionWriter(readypipeinfd) | 663 | writer = ConnectionWriter(readypipeinfd) |
540 | try: | 664 | try: |
541 | featureset = [] | 665 | featureset = [] |
542 | cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) | 666 | cooker = bb.cooker.BBCooker(featureset, server) |
667 | cooker.configuration.profile = profile | ||
543 | except bb.BBHandledException: | 668 | except bb.BBHandledException: |
544 | return None | 669 | return None |
545 | writer.send("r") | 670 | writer.send("r") |
@@ -549,7 +674,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
549 | 674 | ||
550 | server.run() | 675 | server.run() |
551 | finally: | 676 | finally: |
552 | # Flush any ,essages/errors to the logfile before exit | 677 | # Flush any messages/errors to the logfile before exit |
553 | sys.stdout.flush() | 678 | sys.stdout.flush() |
554 | sys.stderr.flush() | 679 | sys.stderr.flush() |
555 | 680 | ||
@@ -654,23 +779,18 @@ class BBUIEventQueue: | |||
654 | self.reader = ConnectionReader(readfd) | 779 | self.reader = ConnectionReader(readfd) |
655 | 780 | ||
656 | self.t = threading.Thread() | 781 | self.t = threading.Thread() |
657 | self.t.setDaemon(True) | ||
658 | self.t.run = self.startCallbackHandler | 782 | self.t.run = self.startCallbackHandler |
659 | self.t.start() | 783 | self.t.start() |
660 | 784 | ||
661 | def getEvent(self): | 785 | def getEvent(self): |
662 | self.eventQueueLock.acquire() | 786 | with bb.utils.lock_timeout(self.eventQueueLock): |
663 | 787 | if len(self.eventQueue) == 0: | |
664 | if len(self.eventQueue) == 0: | 788 | return None |
665 | self.eventQueueLock.release() | ||
666 | return None | ||
667 | |||
668 | item = self.eventQueue.pop(0) | ||
669 | 789 | ||
670 | if len(self.eventQueue) == 0: | 790 | item = self.eventQueue.pop(0) |
671 | self.eventQueueNotify.clear() | 791 | if len(self.eventQueue) == 0: |
792 | self.eventQueueNotify.clear() | ||
672 | 793 | ||
673 | self.eventQueueLock.release() | ||
674 | return item | 794 | return item |
675 | 795 | ||
676 | def waitEvent(self, delay): | 796 | def waitEvent(self, delay): |
@@ -678,10 +798,9 @@ class BBUIEventQueue: | |||
678 | return self.getEvent() | 798 | return self.getEvent() |
679 | 799 | ||
680 | def queue_event(self, event): | 800 | def queue_event(self, event): |
681 | self.eventQueueLock.acquire() | 801 | with bb.utils.lock_timeout(self.eventQueueLock): |
682 | self.eventQueue.append(event) | 802 | self.eventQueue.append(event) |
683 | self.eventQueueNotify.set() | 803 | self.eventQueueNotify.set() |
684 | self.eventQueueLock.release() | ||
685 | 804 | ||
686 | def send_event(self, event): | 805 | def send_event(self, event): |
687 | self.queue_event(pickle.loads(event)) | 806 | self.queue_event(pickle.loads(event)) |
@@ -690,13 +809,17 @@ class BBUIEventQueue: | |||
690 | bb.utils.set_process_name("UIEventQueue") | 809 | bb.utils.set_process_name("UIEventQueue") |
691 | while True: | 810 | while True: |
692 | try: | 811 | try: |
693 | self.reader.wait() | 812 | ready = self.reader.wait(0.25) |
694 | event = self.reader.get() | 813 | if ready: |
695 | self.queue_event(event) | 814 | event = self.reader.get() |
696 | except EOFError: | 815 | self.queue_event(event) |
816 | except (EOFError, OSError, TypeError): | ||
697 | # Easiest way to exit is to close the file descriptor to cause an exit | 817 | # Easiest way to exit is to close the file descriptor to cause an exit |
698 | break | 818 | break |
819 | |||
820 | def close(self): | ||
699 | self.reader.close() | 821 | self.reader.close() |
822 | self.t.join() | ||
700 | 823 | ||
701 | class ConnectionReader(object): | 824 | class ConnectionReader(object): |
702 | 825 | ||
@@ -711,7 +834,7 @@ class ConnectionReader(object): | |||
711 | return self.reader.poll(timeout) | 834 | return self.reader.poll(timeout) |
712 | 835 | ||
713 | def get(self): | 836 | def get(self): |
714 | with self.rlock: | 837 | with bb.utils.lock_timeout(self.rlock): |
715 | res = self.reader.recv_bytes() | 838 | res = self.reader.recv_bytes() |
716 | return multiprocessing.reduction.ForkingPickler.loads(res) | 839 | return multiprocessing.reduction.ForkingPickler.loads(res) |
717 | 840 | ||
@@ -730,10 +853,31 @@ class ConnectionWriter(object): | |||
730 | # Why bb.event needs this I have no idea | 853 | # Why bb.event needs this I have no idea |
731 | self.event = self | 854 | self.event = self |
732 | 855 | ||
856 | def _send(self, obj): | ||
857 | gc.disable() | ||
858 | with bb.utils.lock_timeout(self.wlock): | ||
859 | self.writer.send_bytes(obj) | ||
860 | gc.enable() | ||
861 | |||
733 | def send(self, obj): | 862 | def send(self, obj): |
734 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) | 863 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) |
735 | with self.wlock: | 864 | # See notes/code in CookerParser |
736 | self.writer.send_bytes(obj) | 865 | # We must not terminate holding this lock else processes will hang. |
866 | # For SIGTERM, raising afterwards avoids this. | ||
867 | # For SIGINT, we don't want to have written partial data to the pipe. | ||
868 | # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 | ||
869 | process = multiprocessing.current_process() | ||
870 | if process and hasattr(process, "queue_signals"): | ||
871 | with bb.utils.lock_timeout(process.signal_threadlock): | ||
872 | process.queue_signals = True | ||
873 | self._send(obj) | ||
874 | process.queue_signals = False | ||
875 | |||
876 | while len(process.signal_received) > 0: | ||
877 | sig = process.signal_received.pop() | ||
878 | process.handle_sig(sig, None) | ||
879 | else: | ||
880 | self._send(obj) | ||
737 | 881 | ||
738 | def fileno(self): | 882 | def fileno(self): |
739 | return self.writer.fileno() | 883 | return self.writer.fileno() |