diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 404 |
1 files changed, 258 insertions, 146 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index b27b4aefe0..d0f73590cc 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
@@ -13,7 +13,7 @@ | |||
13 | import bb | 13 | import bb |
14 | import bb.event | 14 | import bb.event |
15 | import logging | 15 | import logging |
16 | import multiprocessing | 16 | from bb import multiprocessing |
17 | import threading | 17 | import threading |
18 | import array | 18 | import array |
19 | import os | 19 | import os |
@@ -26,6 +26,9 @@ import errno | |||
26 | import re | 26 | import re |
27 | import datetime | 27 | import datetime |
28 | import pickle | 28 | import pickle |
29 | import traceback | ||
30 | import gc | ||
31 | import stat | ||
29 | import bb.server.xmlrpcserver | 32 | import bb.server.xmlrpcserver |
30 | from bb import daemonize | 33 | from bb import daemonize |
31 | from multiprocessing import queues | 34 | from multiprocessing import queues |
@@ -35,14 +38,48 @@ logger = logging.getLogger('BitBake') | |||
35 | class ProcessTimeout(SystemExit): | 38 | class ProcessTimeout(SystemExit): |
36 | pass | 39 | pass |
37 | 40 | ||
41 | def currenttime(): | ||
42 | return datetime.datetime.now().strftime('%H:%M:%S.%f') | ||
43 | |||
38 | def serverlog(msg): | 44 | def serverlog(msg): |
39 | print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) | 45 | print(str(os.getpid()) + " " + currenttime() + " " + msg) |
40 | sys.stdout.flush() | 46 | #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server |
47 | #sys.stdout.flush() | ||
41 | 48 | ||
42 | class ProcessServer(): | 49 | # |
43 | profile_filename = "profile.log" | 50 | # When we have lockfile issues, try and find infomation about which process is |
44 | profile_processed_filename = "profile.log.processed" | 51 | # using the lockfile |
52 | # | ||
53 | def get_lockfile_process_msg(lockfile): | ||
54 | # Some systems may not have lsof available | ||
55 | procs = None | ||
56 | try: | ||
57 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
58 | except subprocess.CalledProcessError: | ||
59 | # File was deleted? | ||
60 | pass | ||
61 | except OSError as e: | ||
62 | if e.errno != errno.ENOENT: | ||
63 | raise | ||
64 | if procs is None: | ||
65 | # Fall back to fuser if lsof is unavailable | ||
66 | try: | ||
67 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
68 | except subprocess.CalledProcessError: | ||
69 | # File was deleted? | ||
70 | pass | ||
71 | except OSError as e: | ||
72 | if e.errno != errno.ENOENT: | ||
73 | raise | ||
74 | if procs: | ||
75 | return procs.decode("utf-8") | ||
76 | return None | ||
77 | |||
78 | class idleFinish(): | ||
79 | def __init__(self, msg): | ||
80 | self.msg = msg | ||
45 | 81 | ||
82 | class ProcessServer(): | ||
46 | def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): | 83 | def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): |
47 | self.command_channel = False | 84 | self.command_channel = False |
48 | self.command_channel_reply = False | 85 | self.command_channel_reply = False |
@@ -56,12 +93,19 @@ class ProcessServer(): | |||
56 | self.maxuiwait = 30 | 93 | self.maxuiwait = 30 |
57 | self.xmlrpc = False | 94 | self.xmlrpc = False |
58 | 95 | ||
96 | self.idle = None | ||
97 | # Need a lock for _idlefuns changes | ||
59 | self._idlefuns = {} | 98 | self._idlefuns = {} |
99 | self._idlefuncsLock = threading.Lock() | ||
100 | self.idle_cond = threading.Condition(self._idlefuncsLock) | ||
60 | 101 | ||
61 | self.bitbake_lock = lock | 102 | self.bitbake_lock = lock |
62 | self.bitbake_lock_name = lockname | 103 | self.bitbake_lock_name = lockname |
63 | self.sock = sock | 104 | self.sock = sock |
64 | self.sockname = sockname | 105 | self.sockname = sockname |
106 | # It is possible the directory may be renamed. Cache the inode of the socket file | ||
107 | # so we can tell if things changed. | ||
108 | self.sockinode = os.stat(self.sockname)[stat.ST_INO] | ||
65 | 109 | ||
66 | self.server_timeout = server_timeout | 110 | self.server_timeout = server_timeout |
67 | self.timeout = self.server_timeout | 111 | self.timeout = self.server_timeout |
@@ -70,7 +114,9 @@ class ProcessServer(): | |||
70 | def register_idle_function(self, function, data): | 114 | def register_idle_function(self, function, data): |
71 | """Register a function to be called while the server is idle""" | 115 | """Register a function to be called while the server is idle""" |
72 | assert hasattr(function, '__call__') | 116 | assert hasattr(function, '__call__') |
73 | self._idlefuns[function] = data | 117 | with bb.utils.lock_timeout(self._idlefuncsLock): |
118 | self._idlefuns[function] = data | ||
119 | serverlog("Registering idle function %s" % str(function)) | ||
74 | 120 | ||
75 | def run(self): | 121 | def run(self): |
76 | 122 | ||
@@ -91,23 +137,32 @@ class ProcessServer(): | |||
91 | serverlog("Error writing to lock file: %s" % str(e)) | 137 | serverlog("Error writing to lock file: %s" % str(e)) |
92 | pass | 138 | pass |
93 | 139 | ||
94 | if self.cooker.configuration.profile: | 140 | return bb.utils.profile_function("main" in self.cooker.configuration.profile, self.main, "profile-mainloop.log") |
95 | try: | ||
96 | import cProfile as profile | ||
97 | except: | ||
98 | import profile | ||
99 | prof = profile.Profile() | ||
100 | 141 | ||
101 | ret = profile.Profile.runcall(prof, self.main) | 142 | def _idle_check(self): |
143 | return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None | ||
102 | 144 | ||
103 | prof.dump_stats("profile.log") | 145 | def wait_for_idle(self, timeout=30): |
104 | bb.utils.process_profilelog("profile.log") | 146 | # Wait for the idle loop to have cleared |
105 | serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") | 147 | with bb.utils.lock_timeout(self._idlefuncsLock): |
148 | return self.idle_cond.wait_for(self._idle_check, timeout) is not False | ||
106 | 149 | ||
107 | else: | 150 | def set_async_cmd(self, cmd): |
108 | ret = self.main() | 151 | with bb.utils.lock_timeout(self._idlefuncsLock): |
152 | ret = self.idle_cond.wait_for(self._idle_check, 30) | ||
153 | if ret is False: | ||
154 | return False | ||
155 | self.cooker.command.currentAsyncCommand = cmd | ||
156 | return True | ||
109 | 157 | ||
110 | return ret | 158 | def clear_async_cmd(self): |
159 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
160 | self.cooker.command.currentAsyncCommand = None | ||
161 | self.idle_cond.notify_all() | ||
162 | |||
163 | def get_async_cmd(self): | ||
164 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
165 | return self.cooker.command.currentAsyncCommand | ||
111 | 166 | ||
112 | def main(self): | 167 | def main(self): |
113 | self.cooker.pre_serve() | 168 | self.cooker.pre_serve() |
@@ -123,14 +178,19 @@ class ProcessServer(): | |||
123 | fds.append(self.xmlrpc) | 178 | fds.append(self.xmlrpc) |
124 | seendata = False | 179 | seendata = False |
125 | serverlog("Entering server connection loop") | 180 | serverlog("Entering server connection loop") |
181 | serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) | ||
126 | 182 | ||
127 | def disconnect_client(self, fds): | 183 | def disconnect_client(self, fds): |
128 | serverlog("Disconnecting Client") | 184 | serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) |
129 | if self.controllersock: | 185 | if self.controllersock: |
130 | fds.remove(self.controllersock) | 186 | fds.remove(self.controllersock) |
131 | self.controllersock.close() | 187 | self.controllersock.close() |
132 | self.controllersock = False | 188 | self.controllersock = False |
133 | if self.haveui: | 189 | if self.haveui: |
190 | # Wait for the idle loop to have cleared (30s max) | ||
191 | if not self.wait_for_idle(30): | ||
192 | serverlog("Idle loop didn't finish queued commands after 30s, exiting.") | ||
193 | self.quit = True | ||
134 | fds.remove(self.command_channel) | 194 | fds.remove(self.command_channel) |
135 | bb.event.unregister_UIHhandler(self.event_handle, True) | 195 | bb.event.unregister_UIHhandler(self.event_handle, True) |
136 | self.command_channel_reply.writer.close() | 196 | self.command_channel_reply.writer.close() |
@@ -142,12 +202,12 @@ class ProcessServer(): | |||
142 | self.cooker.clientComplete() | 202 | self.cooker.clientComplete() |
143 | self.haveui = False | 203 | self.haveui = False |
144 | ready = select.select(fds,[],[],0)[0] | 204 | ready = select.select(fds,[],[],0)[0] |
145 | if newconnections: | 205 | if newconnections and not self.quit: |
146 | serverlog("Starting new client") | 206 | serverlog("Starting new client") |
147 | conn = newconnections.pop(-1) | 207 | conn = newconnections.pop(-1) |
148 | fds.append(conn) | 208 | fds.append(conn) |
149 | self.controllersock = conn | 209 | self.controllersock = conn |
150 | elif self.timeout is None and not ready: | 210 | elif not self.timeout and not ready: |
151 | serverlog("No timeout, exiting.") | 211 | serverlog("No timeout, exiting.") |
152 | self.quit = True | 212 | self.quit = True |
153 | 213 | ||
@@ -214,11 +274,14 @@ class ProcessServer(): | |||
214 | continue | 274 | continue |
215 | try: | 275 | try: |
216 | serverlog("Running command %s" % command) | 276 | serverlog("Running command %s" % command) |
217 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) | 277 | reply = self.cooker.command.runCommand(command, self) |
218 | serverlog("Command Completed") | 278 | serverlog("Sending reply %s" % repr(reply)) |
279 | self.command_channel_reply.send(reply) | ||
280 | serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) | ||
219 | except Exception as e: | 281 | except Exception as e: |
220 | serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 282 | stack = traceback.format_exc() |
221 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) | 283 | serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) |
284 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) | ||
222 | 285 | ||
223 | if self.xmlrpc in ready: | 286 | if self.xmlrpc in ready: |
224 | self.xmlrpc.handle_requests() | 287 | self.xmlrpc.handle_requests() |
@@ -239,21 +302,42 @@ class ProcessServer(): | |||
239 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) | 302 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) |
240 | seendata = True | 303 | seendata = True |
241 | 304 | ||
242 | ready = self.idle_commands(.1, fds) | 305 | if not self.idle: |
306 | self.idle = threading.Thread(target=self.idle_thread) | ||
307 | self.idle.start() | ||
308 | elif self.idle and not self.idle.is_alive(): | ||
309 | serverlog("Idle thread terminated, main thread exiting too") | ||
310 | bb.error("Idle thread terminated, main thread exiting too") | ||
311 | self.quit = True | ||
243 | 312 | ||
244 | if len(threading.enumerate()) != 1: | 313 | nextsleep = 1.0 |
245 | serverlog("More than one thread left?: " + str(threading.enumerate())) | 314 | if self.xmlrpc: |
315 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
316 | try: | ||
317 | ready = select.select(fds,[],[],nextsleep)[0] | ||
318 | except InterruptedError: | ||
319 | # Ignore EINTR | ||
320 | ready = [] | ||
321 | |||
322 | if self.idle: | ||
323 | self.idle.join() | ||
246 | 324 | ||
247 | serverlog("Exiting") | 325 | serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) |
248 | # Remove the socket file so we don't get any more connections to avoid races | 326 | # Remove the socket file so we don't get any more connections to avoid races |
327 | # The build directory could have been renamed so if the file isn't the one we created | ||
328 | # we shouldn't delete it. | ||
249 | try: | 329 | try: |
250 | os.unlink(self.sockname) | 330 | sockinode = os.stat(self.sockname)[stat.ST_INO] |
251 | except: | 331 | if sockinode == self.sockinode: |
252 | pass | 332 | os.unlink(self.sockname) |
333 | else: | ||
334 | serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) | ||
335 | except Exception as err: | ||
336 | serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) | ||
253 | self.sock.close() | 337 | self.sock.close() |
254 | 338 | ||
255 | try: | 339 | try: |
256 | self.cooker.shutdown(True) | 340 | self.cooker.shutdown(True, idle=False) |
257 | self.cooker.notifier.stop() | 341 | self.cooker.notifier.stop() |
258 | self.cooker.confignotifier.stop() | 342 | self.cooker.confignotifier.stop() |
259 | except: | 343 | except: |
@@ -261,6 +345,9 @@ class ProcessServer(): | |||
261 | 345 | ||
262 | self.cooker.post_serve() | 346 | self.cooker.post_serve() |
263 | 347 | ||
348 | if len(threading.enumerate()) != 1: | ||
349 | serverlog("More than one thread left?: " + str(threading.enumerate())) | ||
350 | |||
264 | # Flush logs before we release the lock | 351 | # Flush logs before we release the lock |
265 | sys.stdout.flush() | 352 | sys.stdout.flush() |
266 | sys.stderr.flush() | 353 | sys.stderr.flush() |
@@ -276,20 +363,21 @@ class ProcessServer(): | |||
276 | except FileNotFoundError: | 363 | except FileNotFoundError: |
277 | return None | 364 | return None |
278 | 365 | ||
279 | lockcontents = get_lock_contents(lockfile) | ||
280 | serverlog("Original lockfile contents: " + str(lockcontents)) | ||
281 | |||
282 | lock.close() | 366 | lock.close() |
283 | lock = None | 367 | lock = None |
284 | 368 | ||
285 | while not lock: | 369 | while not lock: |
286 | i = 0 | 370 | i = 0 |
287 | lock = None | 371 | lock = None |
372 | if not os.path.exists(os.path.basename(lockfile)): | ||
373 | serverlog("Lockfile directory gone, exiting.") | ||
374 | return | ||
375 | |||
288 | while not lock and i < 30: | 376 | while not lock and i < 30: |
289 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) | 377 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) |
290 | if not lock: | 378 | if not lock: |
291 | newlockcontents = get_lock_contents(lockfile) | 379 | newlockcontents = get_lock_contents(lockfile) |
292 | if newlockcontents != lockcontents: | 380 | if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): |
293 | # A new server was started, the lockfile contents changed, we can exit | 381 | # A new server was started, the lockfile contents changed, we can exit |
294 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) | 382 | serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) |
295 | return | 383 | return |
@@ -303,87 +391,82 @@ class ProcessServer(): | |||
303 | return | 391 | return |
304 | 392 | ||
305 | if not lock: | 393 | if not lock: |
306 | # Some systems may not have lsof available | 394 | procs = get_lockfile_process_msg(lockfile) |
307 | procs = None | 395 | msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] |
308 | try: | ||
309 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
310 | except subprocess.CalledProcessError: | ||
311 | # File was deleted? | ||
312 | continue | ||
313 | except OSError as e: | ||
314 | if e.errno != errno.ENOENT: | ||
315 | raise | ||
316 | if procs is None: | ||
317 | # Fall back to fuser if lsof is unavailable | ||
318 | try: | ||
319 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
320 | except subprocess.CalledProcessError: | ||
321 | # File was deleted? | ||
322 | continue | ||
323 | except OSError as e: | ||
324 | if e.errno != errno.ENOENT: | ||
325 | raise | ||
326 | |||
327 | msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" | ||
328 | if procs: | 396 | if procs: |
329 | msg += ":\n%s" % str(procs.decode("utf-8")) | 397 | msg.append(":\n%s" % procs) |
330 | serverlog(msg) | 398 | serverlog("".join(msg)) |
331 | 399 | ||
332 | def idle_commands(self, delay, fds=None): | 400 | def idle_thread(self): |
333 | nextsleep = delay | 401 | bb.utils.profile_function("idle" in self.cooker.configuration.profile, self.idle_thread_internal, "profile-idleloop.log") |
334 | if not fds: | ||
335 | fds = [] | ||
336 | 402 | ||
337 | for function, data in list(self._idlefuns.items()): | 403 | def idle_thread_internal(self): |
338 | try: | 404 | def remove_idle_func(function): |
339 | retval = function(self, data, False) | 405 | with bb.utils.lock_timeout(self._idlefuncsLock): |
340 | if retval is False: | ||
341 | del self._idlefuns[function] | ||
342 | nextsleep = None | ||
343 | elif retval is True: | ||
344 | nextsleep = None | ||
345 | elif isinstance(retval, float) and nextsleep: | ||
346 | if (retval < nextsleep): | ||
347 | nextsleep = retval | ||
348 | elif nextsleep is None: | ||
349 | continue | ||
350 | else: | ||
351 | fds = fds + retval | ||
352 | except SystemExit: | ||
353 | raise | ||
354 | except Exception as exc: | ||
355 | if not isinstance(exc, bb.BBHandledException): | ||
356 | logger.exception('Running idle function') | ||
357 | del self._idlefuns[function] | 406 | del self._idlefuns[function] |
358 | self.quit = True | 407 | self.idle_cond.notify_all() |
359 | 408 | ||
360 | # Create new heartbeat event? | 409 | while not self.quit: |
361 | now = time.time() | 410 | nextsleep = 1.0 |
362 | if now >= self.next_heartbeat: | 411 | fds = [] |
363 | # We might have missed heartbeats. Just trigger once in | ||
364 | # that case and continue after the usual delay. | ||
365 | self.next_heartbeat += self.heartbeat_seconds | ||
366 | if self.next_heartbeat <= now: | ||
367 | self.next_heartbeat = now + self.heartbeat_seconds | ||
368 | if hasattr(self.cooker, "data"): | ||
369 | heartbeat = bb.event.HeartbeatEvent(now) | ||
370 | bb.event.fire(heartbeat, self.cooker.data) | ||
371 | if nextsleep and now + nextsleep > self.next_heartbeat: | ||
372 | # Shorten timeout so that we we wake up in time for | ||
373 | # the heartbeat. | ||
374 | nextsleep = self.next_heartbeat - now | ||
375 | |||
376 | if nextsleep is not None: | ||
377 | if self.xmlrpc: | ||
378 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
379 | try: | ||
380 | return select.select(fds,[],[],nextsleep)[0] | ||
381 | except InterruptedError: | ||
382 | # Ignore EINTR | ||
383 | return [] | ||
384 | else: | ||
385 | return select.select(fds,[],[],0)[0] | ||
386 | 412 | ||
413 | with bb.utils.lock_timeout(self._idlefuncsLock): | ||
414 | items = list(self._idlefuns.items()) | ||
415 | |||
416 | for function, data in items: | ||
417 | try: | ||
418 | retval = function(self, data, False) | ||
419 | if isinstance(retval, idleFinish): | ||
420 | serverlog("Removing idle function %s at idleFinish" % str(function)) | ||
421 | remove_idle_func(function) | ||
422 | self.cooker.command.finishAsyncCommand(retval.msg) | ||
423 | nextsleep = None | ||
424 | elif retval is False: | ||
425 | serverlog("Removing idle function %s" % str(function)) | ||
426 | remove_idle_func(function) | ||
427 | nextsleep = None | ||
428 | elif retval is True: | ||
429 | nextsleep = None | ||
430 | elif isinstance(retval, float) and nextsleep: | ||
431 | if (retval < nextsleep): | ||
432 | nextsleep = retval | ||
433 | elif nextsleep is None: | ||
434 | continue | ||
435 | else: | ||
436 | fds = fds + retval | ||
437 | except SystemExit: | ||
438 | raise | ||
439 | except Exception as exc: | ||
440 | if not isinstance(exc, bb.BBHandledException): | ||
441 | logger.exception('Running idle function') | ||
442 | remove_idle_func(function) | ||
443 | serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) | ||
444 | self.quit = True | ||
445 | |||
446 | # Create new heartbeat event? | ||
447 | now = time.time() | ||
448 | if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat: | ||
449 | # We might have missed heartbeats. Just trigger once in | ||
450 | # that case and continue after the usual delay. | ||
451 | self.next_heartbeat += self.heartbeat_seconds | ||
452 | if self.next_heartbeat <= now: | ||
453 | self.next_heartbeat = now + self.heartbeat_seconds | ||
454 | if hasattr(self.cooker, "data"): | ||
455 | heartbeat = bb.event.HeartbeatEvent(now) | ||
456 | try: | ||
457 | bb.event.fire(heartbeat, self.cooker.data) | ||
458 | except Exception as exc: | ||
459 | if not isinstance(exc, bb.BBHandledException): | ||
460 | logger.exception('Running heartbeat function') | ||
461 | serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) | ||
462 | self.quit = True | ||
463 | if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: | ||
464 | # Shorten timeout so that we we wake up in time for | ||
465 | # the heartbeat. | ||
466 | nextsleep = self.next_heartbeat - now | ||
467 | |||
468 | if nextsleep is not None: | ||
469 | select.select(fds,[],[],nextsleep)[0] | ||
387 | 470 | ||
388 | class ServerCommunicator(): | 471 | class ServerCommunicator(): |
389 | def __init__(self, connection, recv): | 472 | def __init__(self, connection, recv): |
@@ -391,12 +474,18 @@ class ServerCommunicator(): | |||
391 | self.recv = recv | 474 | self.recv = recv |
392 | 475 | ||
393 | def runCommand(self, command): | 476 | def runCommand(self, command): |
394 | self.connection.send(command) | 477 | try: |
478 | self.connection.send(command) | ||
479 | except BrokenPipeError as e: | ||
480 | raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
395 | if not self.recv.poll(30): | 481 | if not self.recv.poll(30): |
396 | logger.info("No reply from server in 30s") | 482 | logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) |
397 | if not self.recv.poll(30): | 483 | if not self.recv.poll(30): |
398 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") | 484 | raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) |
399 | ret, exc = self.recv.get() | 485 | try: |
486 | ret, exc = self.recv.get() | ||
487 | except EOFError as e: | ||
488 | raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e | ||
400 | # Should probably turn all exceptions in exc back into exceptions? | 489 | # Should probably turn all exceptions in exc back into exceptions? |
401 | # For now, at least handle BBHandledException | 490 | # For now, at least handle BBHandledException |
402 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): | 491 | if exc and ("BBHandledException" in exc or "SystemExit" in exc): |
@@ -429,6 +518,7 @@ class BitBakeProcessServerConnection(object): | |||
429 | self.socket_connection = sock | 518 | self.socket_connection = sock |
430 | 519 | ||
431 | def terminate(self): | 520 | def terminate(self): |
521 | self.events.close() | ||
432 | self.socket_connection.close() | 522 | self.socket_connection.close() |
433 | self.connection.connection.close() | 523 | self.connection.connection.close() |
434 | self.connection.recv.close() | 524 | self.connection.recv.close() |
@@ -439,13 +529,14 @@ start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' | |||
439 | 529 | ||
440 | class BitBakeServer(object): | 530 | class BitBakeServer(object): |
441 | 531 | ||
442 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): | 532 | def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): |
443 | 533 | ||
444 | self.server_timeout = server_timeout | 534 | self.server_timeout = server_timeout |
445 | self.xmlrpcinterface = xmlrpcinterface | 535 | self.xmlrpcinterface = xmlrpcinterface |
446 | self.featureset = featureset | 536 | self.featureset = featureset |
447 | self.sockname = sockname | 537 | self.sockname = sockname |
448 | self.bitbake_lock = lock | 538 | self.bitbake_lock = lock |
539 | self.profile = profile | ||
449 | self.readypipe, self.readypipein = os.pipe() | 540 | self.readypipe, self.readypipein = os.pipe() |
450 | 541 | ||
451 | # Place the log in the builddirectory alongside the lock file | 542 | # Place the log in the builddirectory alongside the lock file |
@@ -466,7 +557,7 @@ class BitBakeServer(object): | |||
466 | try: | 557 | try: |
467 | r = ready.get() | 558 | r = ready.get() |
468 | except EOFError: | 559 | except EOFError: |
469 | # Trap the child exitting/closing the pipe and error out | 560 | # Trap the child exiting/closing the pipe and error out |
470 | r = None | 561 | r = None |
471 | if not r or r[0] != "r": | 562 | if not r or r[0] != "r": |
472 | ready.close() | 563 | ready.close() |
@@ -509,9 +600,9 @@ class BitBakeServer(object): | |||
509 | os.set_inheritable(self.bitbake_lock.fileno(), True) | 600 | os.set_inheritable(self.bitbake_lock.fileno(), True) |
510 | os.set_inheritable(self.readypipein, True) | 601 | os.set_inheritable(self.readypipein, True) |
511 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") | 602 | serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") |
512 | os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) | 603 | os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(list(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) |
513 | 604 | ||
514 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): | 605 | def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): |
515 | 606 | ||
516 | import bb.cookerdata | 607 | import bb.cookerdata |
517 | import bb.cooker | 608 | import bb.cooker |
@@ -523,6 +614,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
523 | 614 | ||
524 | # Create server control socket | 615 | # Create server control socket |
525 | if os.path.exists(sockname): | 616 | if os.path.exists(sockname): |
617 | serverlog("WARNING: removing existing socket file '%s'" % sockname) | ||
526 | os.unlink(sockname) | 618 | os.unlink(sockname) |
527 | 619 | ||
528 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 620 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
@@ -539,7 +631,8 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
539 | writer = ConnectionWriter(readypipeinfd) | 631 | writer = ConnectionWriter(readypipeinfd) |
540 | try: | 632 | try: |
541 | featureset = [] | 633 | featureset = [] |
542 | cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) | 634 | cooker = bb.cooker.BBCooker(featureset, server) |
635 | cooker.configuration.profile = profile | ||
543 | except bb.BBHandledException: | 636 | except bb.BBHandledException: |
544 | return None | 637 | return None |
545 | writer.send("r") | 638 | writer.send("r") |
@@ -549,7 +642,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
549 | 642 | ||
550 | server.run() | 643 | server.run() |
551 | finally: | 644 | finally: |
552 | # Flush any ,essages/errors to the logfile before exit | 645 | # Flush any messages/errors to the logfile before exit |
553 | sys.stdout.flush() | 646 | sys.stdout.flush() |
554 | sys.stderr.flush() | 647 | sys.stderr.flush() |
555 | 648 | ||
@@ -654,23 +747,18 @@ class BBUIEventQueue: | |||
654 | self.reader = ConnectionReader(readfd) | 747 | self.reader = ConnectionReader(readfd) |
655 | 748 | ||
656 | self.t = threading.Thread() | 749 | self.t = threading.Thread() |
657 | self.t.setDaemon(True) | ||
658 | self.t.run = self.startCallbackHandler | 750 | self.t.run = self.startCallbackHandler |
659 | self.t.start() | 751 | self.t.start() |
660 | 752 | ||
661 | def getEvent(self): | 753 | def getEvent(self): |
662 | self.eventQueueLock.acquire() | 754 | with bb.utils.lock_timeout(self.eventQueueLock): |
663 | 755 | if len(self.eventQueue) == 0: | |
664 | if len(self.eventQueue) == 0: | 756 | return None |
665 | self.eventQueueLock.release() | ||
666 | return None | ||
667 | |||
668 | item = self.eventQueue.pop(0) | ||
669 | 757 | ||
670 | if len(self.eventQueue) == 0: | 758 | item = self.eventQueue.pop(0) |
671 | self.eventQueueNotify.clear() | 759 | if len(self.eventQueue) == 0: |
760 | self.eventQueueNotify.clear() | ||
672 | 761 | ||
673 | self.eventQueueLock.release() | ||
674 | return item | 762 | return item |
675 | 763 | ||
676 | def waitEvent(self, delay): | 764 | def waitEvent(self, delay): |
@@ -678,10 +766,9 @@ class BBUIEventQueue: | |||
678 | return self.getEvent() | 766 | return self.getEvent() |
679 | 767 | ||
680 | def queue_event(self, event): | 768 | def queue_event(self, event): |
681 | self.eventQueueLock.acquire() | 769 | with bb.utils.lock_timeout(self.eventQueueLock): |
682 | self.eventQueue.append(event) | 770 | self.eventQueue.append(event) |
683 | self.eventQueueNotify.set() | 771 | self.eventQueueNotify.set() |
684 | self.eventQueueLock.release() | ||
685 | 772 | ||
686 | def send_event(self, event): | 773 | def send_event(self, event): |
687 | self.queue_event(pickle.loads(event)) | 774 | self.queue_event(pickle.loads(event)) |
@@ -690,13 +777,17 @@ class BBUIEventQueue: | |||
690 | bb.utils.set_process_name("UIEventQueue") | 777 | bb.utils.set_process_name("UIEventQueue") |
691 | while True: | 778 | while True: |
692 | try: | 779 | try: |
693 | self.reader.wait() | 780 | ready = self.reader.wait(0.25) |
694 | event = self.reader.get() | 781 | if ready: |
695 | self.queue_event(event) | 782 | event = self.reader.get() |
696 | except EOFError: | 783 | self.queue_event(event) |
784 | except (EOFError, OSError, TypeError): | ||
697 | # Easiest way to exit is to close the file descriptor to cause an exit | 785 | # Easiest way to exit is to close the file descriptor to cause an exit |
698 | break | 786 | break |
787 | |||
788 | def close(self): | ||
699 | self.reader.close() | 789 | self.reader.close() |
790 | self.t.join() | ||
700 | 791 | ||
701 | class ConnectionReader(object): | 792 | class ConnectionReader(object): |
702 | 793 | ||
@@ -711,7 +802,7 @@ class ConnectionReader(object): | |||
711 | return self.reader.poll(timeout) | 802 | return self.reader.poll(timeout) |
712 | 803 | ||
713 | def get(self): | 804 | def get(self): |
714 | with self.rlock: | 805 | with bb.utils.lock_timeout(self.rlock): |
715 | res = self.reader.recv_bytes() | 806 | res = self.reader.recv_bytes() |
716 | return multiprocessing.reduction.ForkingPickler.loads(res) | 807 | return multiprocessing.reduction.ForkingPickler.loads(res) |
717 | 808 | ||
@@ -730,10 +821,31 @@ class ConnectionWriter(object): | |||
730 | # Why bb.event needs this I have no idea | 821 | # Why bb.event needs this I have no idea |
731 | self.event = self | 822 | self.event = self |
732 | 823 | ||
824 | def _send(self, obj): | ||
825 | gc.disable() | ||
826 | with bb.utils.lock_timeout(self.wlock): | ||
827 | self.writer.send_bytes(obj) | ||
828 | gc.enable() | ||
829 | |||
733 | def send(self, obj): | 830 | def send(self, obj): |
734 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) | 831 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) |
735 | with self.wlock: | 832 | # See notes/code in CookerParser |
736 | self.writer.send_bytes(obj) | 833 | # We must not terminate holding this lock else processes will hang. |
834 | # For SIGTERM, raising afterwards avoids this. | ||
835 | # For SIGINT, we don't want to have written partial data to the pipe. | ||
836 | # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 | ||
837 | process = multiprocessing.current_process() | ||
838 | if process and hasattr(process, "queue_signals"): | ||
839 | with bb.utils.lock_timeout(process.signal_threadlock): | ||
840 | process.queue_signals = True | ||
841 | self._send(obj) | ||
842 | process.queue_signals = False | ||
843 | |||
844 | while len(process.signal_received) > 0: | ||
845 | sig = process.signal_received.pop() | ||
846 | process.handle_sig(sig, None) | ||
847 | else: | ||
848 | self._send(obj) | ||
737 | 849 | ||
738 | def fileno(self): | 850 | def fileno(self): |
739 | return self.writer.fileno() | 851 | return self.writer.fileno() |