diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2022-12-30 22:13:45 +0000 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2022-12-31 17:05:17 +0000 |
commit | 4c57c6eeecc43d0479380144f3073e61a8b43375 (patch) | |
tree | 75988728828f7c8528d89815ed1f48353f25d40a | |
parent | 3cc9aed5a59d7b72b98ef40727102c98b031f911 (diff) | |
download | poky-4c57c6eeecc43d0479380144f3073e61a8b43375.tar.gz |
bitbake: server/process: Run idle commands in a separate idle thread
When bitbake is off running heavier "idle" commands, it doesn't service it's
command socket which means stopping/interrupting it is hard. It also means we
can't "ping" from the UI to know if it is still alive.
For those reasons, split idle command execution into it's own thread.
The commands are generally already self containted so this is easier than
expected. We do have to be careful to only handle inotify poll() from a single
thread at a time. It also means we always have to use a thread lock when sending
events since both the idle thread and the command thread may generate log messages
(and hence events). The patch depends on previous fixes to the builtins locking
in event.py and the heartbeat enable/disable changes as well as other locking
additions.
We use a condition to signal from the idle thread when other sections of code
can continue, thanks to Joshua Watt for the review and tweaks squashed into this
patch. We do have some sync points where we need to ensure any currently executing
commands have finished before we can start a new async command for example.
(Bitbake rev: 67dd9a5e84811df8869a82da6a37a41ee8fe94e2)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r-- | bitbake/lib/bb/command.py | 11 | ||||
-rw-r--r-- | bitbake/lib/bb/cooker.py | 29 | ||||
-rw-r--r-- | bitbake/lib/bb/server/process.py | 105 | ||||
-rw-r--r-- | bitbake/lib/bb/server/xmlrpcserver.py | 2 |
4 files changed, 97 insertions, 50 deletions
diff --git a/bitbake/lib/bb/command.py b/bitbake/lib/bb/command.py index 732327d84d..0706b89271 100644 --- a/bitbake/lib/bb/command.py +++ b/bitbake/lib/bb/command.py | |||
@@ -60,7 +60,7 @@ class Command: | |||
60 | # FIXME Add lock for this | 60 | # FIXME Add lock for this |
61 | self.currentAsyncCommand = None | 61 | self.currentAsyncCommand = None |
62 | 62 | ||
63 | def runCommand(self, commandline, ro_only = False): | 63 | def runCommand(self, commandline, process_server, ro_only=False): |
64 | command = commandline.pop(0) | 64 | command = commandline.pop(0) |
65 | 65 | ||
66 | # Ensure cooker is ready for commands | 66 | # Ensure cooker is ready for commands |
@@ -84,7 +84,7 @@ class Command: | |||
84 | if not hasattr(command_method, 'readonly') or not getattr(command_method, 'readonly'): | 84 | if not hasattr(command_method, 'readonly') or not getattr(command_method, 'readonly'): |
85 | return None, "Not able to execute not readonly commands in readonly mode" | 85 | return None, "Not able to execute not readonly commands in readonly mode" |
86 | try: | 86 | try: |
87 | self.cooker.process_inotify_updates() | 87 | self.cooker.process_inotify_updates_apply() |
88 | if getattr(command_method, 'needconfig', True): | 88 | if getattr(command_method, 'needconfig', True): |
89 | self.cooker.updateCacheSync() | 89 | self.cooker.updateCacheSync() |
90 | result = command_method(self, commandline) | 90 | result = command_method(self, commandline) |
@@ -100,7 +100,10 @@ class Command: | |||
100 | else: | 100 | else: |
101 | return result, None | 101 | return result, None |
102 | if self.currentAsyncCommand is not None: | 102 | if self.currentAsyncCommand is not None: |
103 | return None, "Busy (%s in progress)" % self.currentAsyncCommand[0] | 103 | # Wait for the idle loop to have cleared (30s max) |
104 | process_server.wait_for_idle(timeout=30) | ||
105 | if self.currentAsyncCommand is not None: | ||
106 | return None, "Busy (%s in progress)" % self.currentAsyncCommand[0] | ||
104 | if command not in CommandsAsync.__dict__: | 107 | if command not in CommandsAsync.__dict__: |
105 | return None, "No such command" | 108 | return None, "No such command" |
106 | self.currentAsyncCommand = (command, commandline) | 109 | self.currentAsyncCommand = (command, commandline) |
@@ -109,7 +112,7 @@ class Command: | |||
109 | 112 | ||
110 | def runAsyncCommand(self): | 113 | def runAsyncCommand(self): |
111 | try: | 114 | try: |
112 | self.cooker.process_inotify_updates() | 115 | self.cooker.process_inotify_updates_apply() |
113 | if self.cooker.state in (bb.cooker.state.error, bb.cooker.state.shutdown, bb.cooker.state.forceshutdown): | 116 | if self.cooker.state in (bb.cooker.state.error, bb.cooker.state.shutdown, bb.cooker.state.forceshutdown): |
114 | # updateCache will trigger a shutdown of the parser | 117 | # updateCache will trigger a shutdown of the parser |
115 | # and then raise BBHandledException triggering an exit | 118 | # and then raise BBHandledException triggering an exit |
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index ded9369787..a5a635858c 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py | |||
@@ -149,7 +149,7 @@ class BBCooker: | |||
149 | Manages one bitbake build run | 149 | Manages one bitbake build run |
150 | """ | 150 | """ |
151 | 151 | ||
152 | def __init__(self, featureSet=None, idleCallBackRegister=None): | 152 | def __init__(self, featureSet=None, idleCallBackRegister=None, waitIdle=None): |
153 | self.recipecaches = None | 153 | self.recipecaches = None |
154 | self.eventlog = None | 154 | self.eventlog = None |
155 | self.skiplist = {} | 155 | self.skiplist = {} |
@@ -164,6 +164,7 @@ class BBCooker: | |||
164 | self.configuration = bb.cookerdata.CookerConfiguration() | 164 | self.configuration = bb.cookerdata.CookerConfiguration() |
165 | 165 | ||
166 | self.idleCallBackRegister = idleCallBackRegister | 166 | self.idleCallBackRegister = idleCallBackRegister |
167 | self.waitIdle = waitIdle | ||
167 | 168 | ||
168 | bb.debug(1, "BBCooker starting %s" % time.time()) | 169 | bb.debug(1, "BBCooker starting %s" % time.time()) |
169 | sys.stdout.flush() | 170 | sys.stdout.flush() |
@@ -220,6 +221,8 @@ class BBCooker: | |||
220 | bb.debug(1, "BBCooker startup complete %s" % time.time()) | 221 | bb.debug(1, "BBCooker startup complete %s" % time.time()) |
221 | sys.stdout.flush() | 222 | sys.stdout.flush() |
222 | 223 | ||
224 | self.inotify_threadlock = threading.Lock() | ||
225 | |||
223 | def init_configdata(self): | 226 | def init_configdata(self): |
224 | if not hasattr(self, "data"): | 227 | if not hasattr(self, "data"): |
225 | self.initConfigurationData() | 228 | self.initConfigurationData() |
@@ -248,11 +251,18 @@ class BBCooker: | |||
248 | self.notifier = pyinotify.Notifier(self.watcher, self.notifications) | 251 | self.notifier = pyinotify.Notifier(self.watcher, self.notifications) |
249 | 252 | ||
250 | def process_inotify_updates(self): | 253 | def process_inotify_updates(self): |
251 | for n in [self.confignotifier, self.notifier]: | 254 | with self.inotify_threadlock: |
252 | if n and n.check_events(timeout=0): | 255 | for n in [self.confignotifier, self.notifier]: |
253 | # read notified events and enqueue them | 256 | if n and n.check_events(timeout=0): |
254 | n.read_events() | 257 | # read notified events and enqueue them |
255 | n.process_events() | 258 | n.read_events() |
259 | |||
260 | def process_inotify_updates_apply(self): | ||
261 | with self.inotify_threadlock: | ||
262 | for n in [self.confignotifier, self.notifier]: | ||
263 | if n and n.check_events(timeout=0): | ||
264 | n.read_events() | ||
265 | n.process_events() | ||
256 | 266 | ||
257 | def config_notifications(self, event): | 267 | def config_notifications(self, event): |
258 | if event.maskname == "IN_Q_OVERFLOW": | 268 | if event.maskname == "IN_Q_OVERFLOW": |
@@ -1744,7 +1754,7 @@ class BBCooker: | |||
1744 | return | 1754 | return |
1745 | 1755 | ||
1746 | def post_serve(self): | 1756 | def post_serve(self): |
1747 | self.shutdown(force=True) | 1757 | self.shutdown(force=True, idle=False) |
1748 | prserv.serv.auto_shutdown() | 1758 | prserv.serv.auto_shutdown() |
1749 | if hasattr(bb.parse, "siggen"): | 1759 | if hasattr(bb.parse, "siggen"): |
1750 | bb.parse.siggen.exit() | 1760 | bb.parse.siggen.exit() |
@@ -1754,12 +1764,15 @@ class BBCooker: | |||
1754 | if hasattr(self, "data"): | 1764 | if hasattr(self, "data"): |
1755 | bb.event.fire(CookerExit(), self.data) | 1765 | bb.event.fire(CookerExit(), self.data) |
1756 | 1766 | ||
1757 | def shutdown(self, force = False): | 1767 | def shutdown(self, force=False, idle=True): |
1758 | if force: | 1768 | if force: |
1759 | self.state = state.forceshutdown | 1769 | self.state = state.forceshutdown |
1760 | else: | 1770 | else: |
1761 | self.state = state.shutdown | 1771 | self.state = state.shutdown |
1762 | 1772 | ||
1773 | if idle: | ||
1774 | self.waitIdle(30) | ||
1775 | |||
1763 | if self.parser: | 1776 | if self.parser: |
1764 | self.parser.shutdown(clean=not force) | 1777 | self.parser.shutdown(clean=not force) |
1765 | self.parser.final_cleanup() | 1778 | self.parser.final_cleanup() |
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 2aee9ef051..ac7749d36c 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
@@ -92,8 +92,11 @@ class ProcessServer(): | |||
92 | self.maxuiwait = 30 | 92 | self.maxuiwait = 30 |
93 | self.xmlrpc = False | 93 | self.xmlrpc = False |
94 | 94 | ||
95 | self.idle = None | ||
96 | # Need a lock for _idlefuns changes | ||
95 | self._idlefuns = {} | 97 | self._idlefuns = {} |
96 | self._idlefuncsLock = threading.Lock() | 98 | self._idlefuncsLock = threading.Lock() |
99 | self.idle_cond = threading.Condition(self._idlefuncsLock) | ||
97 | 100 | ||
98 | self.bitbake_lock = lock | 101 | self.bitbake_lock = lock |
99 | self.bitbake_lock_name = lockname | 102 | self.bitbake_lock_name = lockname |
@@ -151,6 +154,12 @@ class ProcessServer(): | |||
151 | 154 | ||
152 | return ret | 155 | return ret |
153 | 156 | ||
157 | def wait_for_idle(self, timeout=30): | ||
158 | # Wait for the idle loop to have cleared | ||
159 | with self.idle_cond: | ||
160 | # FIXME - the 1 is the inotify processing in cooker which always runs | ||
161 | self.idle_cond.wait_for(lambda: len(self._idlefuns) <= 1, timeout) | ||
162 | |||
154 | def main(self): | 163 | def main(self): |
155 | self.cooker.pre_serve() | 164 | self.cooker.pre_serve() |
156 | 165 | ||
@@ -174,6 +183,12 @@ class ProcessServer(): | |||
174 | self.controllersock.close() | 183 | self.controllersock.close() |
175 | self.controllersock = False | 184 | self.controllersock = False |
176 | if self.haveui: | 185 | if self.haveui: |
186 | # Wait for the idle loop to have cleared (30s max) | ||
187 | self.wait_for_idle(30) | ||
188 | if self.cooker.command.currentAsyncCommand is not None: | ||
189 | serverlog("Idle loop didn't finish queued commands after 30s, exiting.") | ||
190 | self.quit = True | ||
191 | |||
177 | fds.remove(self.command_channel) | 192 | fds.remove(self.command_channel) |
178 | bb.event.unregister_UIHhandler(self.event_handle, True) | 193 | bb.event.unregister_UIHhandler(self.event_handle, True) |
179 | self.command_channel_reply.writer.close() | 194 | self.command_channel_reply.writer.close() |
@@ -185,7 +200,7 @@ class ProcessServer(): | |||
185 | self.cooker.clientComplete() | 200 | self.cooker.clientComplete() |
186 | self.haveui = False | 201 | self.haveui = False |
187 | ready = select.select(fds,[],[],0)[0] | 202 | ready = select.select(fds,[],[],0)[0] |
188 | if newconnections: | 203 | if newconnections and not self.quit: |
189 | serverlog("Starting new client") | 204 | serverlog("Starting new client") |
190 | conn = newconnections.pop(-1) | 205 | conn = newconnections.pop(-1) |
191 | fds.append(conn) | 206 | fds.append(conn) |
@@ -257,7 +272,7 @@ class ProcessServer(): | |||
257 | continue | 272 | continue |
258 | try: | 273 | try: |
259 | serverlog("Running command %s" % command) | 274 | serverlog("Running command %s" % command) |
260 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) | 275 | self.command_channel_reply.send(self.cooker.command.runCommand(command, self)) |
261 | serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) | 276 | serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) |
262 | except Exception as e: | 277 | except Exception as e: |
263 | stack = traceback.format_exc() | 278 | stack = traceback.format_exc() |
@@ -285,6 +300,9 @@ class ProcessServer(): | |||
285 | 300 | ||
286 | ready = self.idle_commands(.1, fds) | 301 | ready = self.idle_commands(.1, fds) |
287 | 302 | ||
303 | if self.idle: | ||
304 | self.idle.join() | ||
305 | |||
288 | serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) | 306 | serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) |
289 | # Remove the socket file so we don't get any more connections to avoid races | 307 | # Remove the socket file so we don't get any more connections to avoid races |
290 | # The build directory could have been renamed so if the file isn't the one we created | 308 | # The build directory could have been renamed so if the file isn't the one we created |
@@ -300,7 +318,7 @@ class ProcessServer(): | |||
300 | self.sock.close() | 318 | self.sock.close() |
301 | 319 | ||
302 | try: | 320 | try: |
303 | self.cooker.shutdown(True) | 321 | self.cooker.shutdown(True, idle=False) |
304 | self.cooker.notifier.stop() | 322 | self.cooker.notifier.stop() |
305 | self.cooker.confignotifier.stop() | 323 | self.cooker.confignotifier.stop() |
306 | except: | 324 | except: |
@@ -359,47 +377,60 @@ class ProcessServer(): | |||
359 | msg.append(":\n%s" % procs) | 377 | msg.append(":\n%s" % procs) |
360 | serverlog("".join(msg)) | 378 | serverlog("".join(msg)) |
361 | 379 | ||
362 | def idle_commands(self, delay, fds=None): | 380 | def idle_thread(self): |
363 | def remove_idle_func(function): | 381 | def remove_idle_func(function): |
364 | with self._idlefuncsLock: | 382 | with self._idlefuncsLock: |
365 | del self._idlefuns[function] | 383 | del self._idlefuns[function] |
384 | self.idle_cond.notify_all() | ||
366 | 385 | ||
367 | nextsleep = delay | 386 | while not self.quit: |
368 | if not fds: | 387 | nextsleep = 0.1 |
369 | fds = [] | 388 | fds = [] |
370 | 389 | ||
371 | with self._idlefuncsLock: | 390 | with self._idlefuncsLock: |
372 | items = list(self._idlefuns.items()) | 391 | items = list(self._idlefuns.items()) |
373 | 392 | ||
374 | for function, data in items: | 393 | for function, data in items: |
375 | try: | 394 | try: |
376 | retval = function(self, data, False) | 395 | retval = function(self, data, False) |
377 | if isinstance(retval, idleFinish): | 396 | if isinstance(retval, idleFinish): |
378 | serverlog("Removing idle function %s at idleFinish" % str(function)) | 397 | serverlog("Removing idle function %s at idleFinish" % str(function)) |
379 | remove_idle_func(function) | 398 | remove_idle_func(function) |
380 | self.cooker.command.finishAsyncCommand(retval.msg) | 399 | self.cooker.command.finishAsyncCommand(retval.msg) |
381 | nextsleep = None | 400 | nextsleep = None |
382 | elif retval is False: | 401 | elif retval is False: |
383 | serverlog("Removing idle function %s" % str(function)) | 402 | serverlog("Removing idle function %s" % str(function)) |
403 | remove_idle_func(function) | ||
404 | nextsleep = None | ||
405 | elif retval is True: | ||
406 | nextsleep = None | ||
407 | elif isinstance(retval, float) and nextsleep: | ||
408 | if (retval < nextsleep): | ||
409 | nextsleep = retval | ||
410 | elif nextsleep is None: | ||
411 | continue | ||
412 | else: | ||
413 | fds = fds + retval | ||
414 | except SystemExit: | ||
415 | raise | ||
416 | except Exception as exc: | ||
417 | if not isinstance(exc, bb.BBHandledException): | ||
418 | logger.exception('Running idle function') | ||
384 | remove_idle_func(function) | 419 | remove_idle_func(function) |
385 | nextsleep = None | 420 | serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) |
386 | elif retval is True: | 421 | self.quit = True |
387 | nextsleep = None | 422 | |
388 | elif isinstance(retval, float) and nextsleep: | 423 | if nextsleep is not None: |
389 | if (retval < nextsleep): | 424 | select.select(fds,[],[],nextsleep)[0] |
390 | nextsleep = retval | 425 | |
391 | elif nextsleep is None: | 426 | def idle_commands(self, delay, fds=None): |
392 | continue | 427 | nextsleep = delay |
393 | else: | 428 | if not fds: |
394 | fds = fds + retval | 429 | fds = [] |
395 | except SystemExit: | 430 | |
396 | raise | 431 | if not self.idle: |
397 | except Exception as exc: | 432 | self.idle = threading.Thread(target=self.idle_thread) |
398 | if not isinstance(exc, bb.BBHandledException): | 433 | self.idle.start() |
399 | logger.exception('Running idle function') | ||
400 | remove_idle_func(function) | ||
401 | serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) | ||
402 | self.quit = True | ||
403 | 434 | ||
404 | # Create new heartbeat event? | 435 | # Create new heartbeat event? |
405 | now = time.time() | 436 | now = time.time() |
@@ -592,7 +623,7 @@ def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpc | |||
592 | writer = ConnectionWriter(readypipeinfd) | 623 | writer = ConnectionWriter(readypipeinfd) |
593 | try: | 624 | try: |
594 | featureset = [] | 625 | featureset = [] |
595 | cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) | 626 | cooker = bb.cooker.BBCooker(featureset, server.register_idle_function, server.wait_for_idle) |
596 | cooker.configuration.profile = profile | 627 | cooker.configuration.profile = profile |
597 | except bb.BBHandledException: | 628 | except bb.BBHandledException: |
598 | return None | 629 | return None |
diff --git a/bitbake/lib/bb/server/xmlrpcserver.py b/bitbake/lib/bb/server/xmlrpcserver.py index 01f55538ae..2e65dc34a9 100644 --- a/bitbake/lib/bb/server/xmlrpcserver.py +++ b/bitbake/lib/bb/server/xmlrpcserver.py | |||
@@ -118,7 +118,7 @@ class BitBakeXMLRPCServerCommands(): | |||
118 | """ | 118 | """ |
119 | Run a cooker command on the server | 119 | Run a cooker command on the server |
120 | """ | 120 | """ |
121 | return self.server.cooker.command.runCommand(command, self.server.readonly) | 121 | return self.server.cooker.command.runCommand(command, self.server, self.server.readonly) |
122 | 122 | ||
123 | def getEventHandle(self): | 123 | def getEventHandle(self): |
124 | return self.event_handle | 124 | return self.event_handle |