diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
| -rw-r--r-- | bitbake/lib/bb/server/process.py | 596 |
1 files changed, 393 insertions, 203 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 48da7fe46c..6edb0213ad 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
| @@ -22,128 +22,205 @@ | |||
| 22 | 22 | ||
| 23 | import bb | 23 | import bb |
| 24 | import bb.event | 24 | import bb.event |
| 25 | import itertools | ||
| 26 | import logging | 25 | import logging |
| 27 | import multiprocessing | 26 | import multiprocessing |
| 27 | import threading | ||
| 28 | import array | ||
| 28 | import os | 29 | import os |
| 29 | import signal | ||
| 30 | import sys | 30 | import sys |
| 31 | import time | 31 | import time |
| 32 | import select | 32 | import select |
| 33 | from queue import Empty | 33 | import socket |
| 34 | from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager | 34 | import subprocess |
| 35 | import errno | ||
| 36 | import bb.server.xmlrpcserver | ||
| 37 | from bb import daemonize | ||
| 38 | from multiprocessing import queues | ||
| 35 | 39 | ||
| 36 | logger = logging.getLogger('BitBake') | 40 | logger = logging.getLogger('BitBake') |
| 37 | 41 | ||
| 38 | class ServerCommunicator(): | 42 | class ProcessTimeout(SystemExit): |
| 39 | def __init__(self, connection, event_handle, server): | 43 | pass |
| 40 | self.connection = connection | ||
| 41 | self.event_handle = event_handle | ||
| 42 | self.server = server | ||
| 43 | |||
| 44 | def runCommand(self, command): | ||
| 45 | # @todo try/except | ||
| 46 | self.connection.send(command) | ||
| 47 | |||
| 48 | if not self.server.is_alive(): | ||
| 49 | raise SystemExit | ||
| 50 | |||
| 51 | while True: | ||
| 52 | # don't let the user ctrl-c while we're waiting for a response | ||
| 53 | try: | ||
| 54 | for idx in range(0,4): # 0, 1, 2, 3 | ||
| 55 | if self.connection.poll(5): | ||
| 56 | return self.connection.recv() | ||
| 57 | else: | ||
| 58 | bb.warn("Timeout while attempting to communicate with bitbake server") | ||
| 59 | bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") | ||
| 60 | except KeyboardInterrupt: | ||
| 61 | pass | ||
| 62 | |||
| 63 | def getEventHandle(self): | ||
| 64 | handle, error = self.runCommand(["getUIHandlerNum"]) | ||
| 65 | if error: | ||
| 66 | logger.error("Unable to get UI Handler Number: %s" % error) | ||
| 67 | raise BaseException(error) | ||
| 68 | 44 | ||
| 69 | return handle | 45 | class ProcessServer(multiprocessing.Process): |
| 70 | |||
| 71 | class EventAdapter(): | ||
| 72 | """ | ||
| 73 | Adapter to wrap our event queue since the caller (bb.event) expects to | ||
| 74 | call a send() method, but our actual queue only has put() | ||
| 75 | """ | ||
| 76 | def __init__(self, queue): | ||
| 77 | self.queue = queue | ||
| 78 | |||
| 79 | def send(self, event): | ||
| 80 | try: | ||
| 81 | self.queue.put(event) | ||
| 82 | except Exception as err: | ||
| 83 | print("EventAdapter puked: %s" % str(err)) | ||
| 84 | |||
| 85 | |||
| 86 | class ProcessServer(Process): | ||
| 87 | profile_filename = "profile.log" | 46 | profile_filename = "profile.log" |
| 88 | profile_processed_filename = "profile.log.processed" | 47 | profile_processed_filename = "profile.log.processed" |
| 89 | 48 | ||
| 90 | def __init__(self, command_channel, event_queue, featurelist): | 49 | def __init__(self, lock, sock, sockname): |
| 91 | self._idlefuns = {} | 50 | multiprocessing.Process.__init__(self) |
| 92 | Process.__init__(self) | 51 | self.command_channel = False |
| 93 | self.command_channel = command_channel | 52 | self.command_channel_reply = False |
| 94 | self.event_queue = event_queue | ||
| 95 | self.event = EventAdapter(event_queue) | ||
| 96 | self.featurelist = featurelist | ||
| 97 | self.quit = False | 53 | self.quit = False |
| 98 | self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. | 54 | self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. |
| 99 | self.next_heartbeat = time.time() | 55 | self.next_heartbeat = time.time() |
| 100 | 56 | ||
| 101 | self.quitin, self.quitout = Pipe() | 57 | self.event_handle = None |
| 102 | self.event_handle = multiprocessing.Value("i") | 58 | self.haveui = False |
| 59 | self.lastui = False | ||
| 60 | self.xmlrpc = False | ||
| 61 | |||
| 62 | self._idlefuns = {} | ||
| 63 | |||
| 64 | self.bitbake_lock = lock | ||
| 65 | self.sock = sock | ||
| 66 | self.sockname = sockname | ||
| 67 | |||
| 68 | def register_idle_function(self, function, data): | ||
| 69 | """Register a function to be called while the server is idle""" | ||
| 70 | assert hasattr(function, '__call__') | ||
| 71 | self._idlefuns[function] = data | ||
| 103 | 72 | ||
| 104 | def run(self): | 73 | def run(self): |
| 105 | for event in bb.event.ui_queue: | 74 | |
| 106 | self.event_queue.put(event) | 75 | if self.xmlrpcinterface[0]: |
| 107 | self.event_handle.value = bb.event.register_UIHhandler(self, True) | 76 | self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) |
| 77 | |||
| 78 | print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) | ||
| 108 | 79 | ||
| 109 | heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') | 80 | heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') |
| 110 | if heartbeat_event: | 81 | if heartbeat_event: |
| 111 | try: | 82 | try: |
| 112 | self.heartbeat_seconds = float(heartbeat_event) | 83 | self.heartbeat_seconds = float(heartbeat_event) |
| 113 | except: | 84 | except: |
| 114 | # Throwing an exception here causes bitbake to hang. | ||
| 115 | # Just warn about the invalid setting and continue | ||
| 116 | bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) | 85 | bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) |
| 86 | |||
| 87 | self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') | ||
| 88 | try: | ||
| 89 | if self.timeout: | ||
| 90 | self.timeout = float(self.timeout) | ||
| 91 | except: | ||
| 92 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) | ||
| 93 | |||
| 94 | |||
| 95 | try: | ||
| 96 | self.bitbake_lock.seek(0) | ||
| 97 | self.bitbake_lock.truncate() | ||
| 98 | if self.xmlrpcinterface[0]: | ||
| 99 | self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), configuration.interface[0], configuration.interface[1])) | ||
| 100 | else: | ||
| 101 | self.bitbake_lock.write("%s\n" % (os.getpid())) | ||
| 102 | self.bitbake_lock.flush() | ||
| 103 | except: | ||
| 104 | pass | ||
| 105 | |||
| 117 | bb.cooker.server_main(self.cooker, self.main) | 106 | bb.cooker.server_main(self.cooker, self.main) |
| 118 | 107 | ||
| 119 | def main(self): | 108 | def main(self): |
| 120 | # Ignore SIGINT within the server, as all SIGINT handling is done by | ||
| 121 | # the UI and communicated to us | ||
| 122 | self.quitin.close() | ||
| 123 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
| 124 | bb.utils.set_process_name("Cooker") | 109 | bb.utils.set_process_name("Cooker") |
| 110 | |||
| 111 | ready = [] | ||
| 112 | |||
| 113 | self.controllersock = False | ||
| 114 | fds = [self.sock] | ||
| 115 | if self.xmlrpc: | ||
| 116 | fds.append(self.xmlrpc) | ||
| 125 | while not self.quit: | 117 | while not self.quit: |
| 126 | try: | 118 | if self.command_channel in ready: |
| 127 | if self.command_channel.poll(): | 119 | command = self.command_channel.get() |
| 128 | command = self.command_channel.recv() | 120 | if command[0] == "terminateServer": |
| 129 | self.runCommand(command) | ||
| 130 | if self.quitout.poll(): | ||
| 131 | self.quitout.recv() | ||
| 132 | self.quit = True | 121 | self.quit = True |
| 133 | try: | 122 | continue |
| 134 | self.runCommand(["stateForceShutdown"]) | 123 | try: |
| 135 | except: | 124 | print("Running command %s" % command) |
| 136 | pass | 125 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) |
| 137 | 126 | except Exception as e: | |
| 138 | self.idle_commands(.1, [self.command_channel, self.quitout]) | 127 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) |
| 139 | except Exception: | 128 | |
| 140 | logger.exception('Running command %s', command) | 129 | if self.xmlrpc in ready: |
| 130 | self.xmlrpc.handle_requests() | ||
| 131 | if self.sock in ready: | ||
| 132 | self.controllersock, address = self.sock.accept() | ||
| 133 | if self.haveui: | ||
| 134 | print("Dropping connection attempt as we have a UI %s" % (str(ready))) | ||
| 135 | self.controllersock.close() | ||
| 136 | else: | ||
| 137 | print("Accepting %s" % (str(ready))) | ||
| 138 | fds.append(self.controllersock) | ||
| 139 | if self.controllersock in ready: | ||
| 140 | try: | ||
| 141 | print("Connecting Client") | ||
| 142 | ui_fds = recvfds(self.controllersock, 3) | ||
| 143 | |||
| 144 | # Where to write events to | ||
| 145 | writer = ConnectionWriter(ui_fds[0]) | ||
| 146 | self.event_handle = bb.event.register_UIHhandler(writer, True) | ||
| 147 | self.event_writer = writer | ||
| 148 | |||
| 149 | # Where to read commands from | ||
| 150 | reader = ConnectionReader(ui_fds[1]) | ||
| 151 | fds.append(reader) | ||
| 152 | self.command_channel = reader | ||
| 153 | |||
| 154 | # Where to send command return values to | ||
| 155 | writer = ConnectionWriter(ui_fds[2]) | ||
| 156 | self.command_channel_reply = writer | ||
| 157 | |||
| 158 | self.haveui = True | ||
| 159 | |||
| 160 | except EOFError: | ||
| 161 | print("Disconnecting Client") | ||
| 162 | fds.remove(self.controllersock) | ||
| 163 | fds.remove(self.command_channel) | ||
| 164 | bb.event.unregister_UIHhandler(self.event_handle, True) | ||
| 165 | self.command_channel_reply.writer.close() | ||
| 166 | self.event_writer.writer.close() | ||
| 167 | del self.event_writer | ||
| 168 | self.controllersock.close() | ||
| 169 | self.haveui = False | ||
| 170 | self.lastui = time.time() | ||
| 171 | self.cooker.clientComplete() | ||
| 172 | if self.timeout is None: | ||
| 173 | print("No timeout, exiting.") | ||
| 174 | self.quit = True | ||
| 175 | if not self.haveui and self.lastui and self.timeout and (self.lastui + self.timeout) < time.time(): | ||
| 176 | print("Server timeout, exiting.") | ||
| 177 | self.quit = True | ||
| 141 | 178 | ||
| 142 | self.event_queue.close() | 179 | ready = self.idle_commands(.1, fds) |
| 143 | bb.event.unregister_UIHhandler(self.event_handle.value, True) | 180 | |
| 144 | self.command_channel.close() | 181 | print("Exiting") |
| 145 | self.cooker.shutdown(True) | 182 | try: |
| 146 | self.quitout.close() | 183 | self.cooker.shutdown(True) |
| 184 | except: | ||
| 185 | pass | ||
| 186 | |||
| 187 | # Remove the socket file so we don't get any more connections to avoid races | ||
| 188 | os.unlink(self.sockname) | ||
| 189 | self.sock.close() | ||
| 190 | |||
| 191 | # Finally release the lockfile but warn about other processes holding it open | ||
| 192 | lock = self.bitbake_lock | ||
| 193 | lockfile = lock.name | ||
| 194 | lock.close() | ||
| 195 | lock = None | ||
| 196 | |||
| 197 | while not lock: | ||
| 198 | with bb.utils.timeout(3): | ||
| 199 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True) | ||
| 200 | if not lock: | ||
| 201 | # Some systems may not have lsof available | ||
| 202 | procs = None | ||
| 203 | try: | ||
| 204 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
| 205 | except OSError as e: | ||
| 206 | if e.errno != errno.ENOENT: | ||
| 207 | raise | ||
| 208 | if procs is None: | ||
| 209 | # Fall back to fuser if lsof is unavailable | ||
| 210 | try: | ||
| 211 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
| 212 | except OSError as e: | ||
| 213 | if e.errno != errno.ENOENT: | ||
| 214 | raise | ||
| 215 | |||
| 216 | msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" | ||
| 217 | if procs: | ||
| 218 | msg += ":\n%s" % str(procs) | ||
| 219 | print(msg) | ||
| 220 | return | ||
| 221 | # We hold the lock so we can remove the file (hide stale pid data) | ||
| 222 | bb.utils.remove(lockfile) | ||
| 223 | bb.utils.unlockfile(lock) | ||
| 147 | 224 | ||
| 148 | def idle_commands(self, delay, fds=None): | 225 | def idle_commands(self, delay, fds=None): |
| 149 | nextsleep = delay | 226 | nextsleep = delay |
| @@ -189,140 +266,253 @@ class ProcessServer(Process): | |||
| 189 | nextsleep = self.next_heartbeat - now | 266 | nextsleep = self.next_heartbeat - now |
| 190 | 267 | ||
| 191 | if nextsleep is not None: | 268 | if nextsleep is not None: |
| 269 | if self.xmlrpc: | ||
| 270 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
| 192 | try: | 271 | try: |
| 193 | select.select(fds,[],[],nextsleep) | 272 | return select.select(fds,[],[],nextsleep)[0] |
| 194 | except InterruptedError: | 273 | except InterruptedError: |
| 195 | # ignore EINTR error, nextsleep only used for wait | 274 | # Ignore EINTR |
| 196 | # certain time | 275 | return [] |
| 197 | pass | 276 | else: |
| 277 | return [] | ||
| 278 | |||
| 279 | |||
| 280 | class ServerCommunicator(): | ||
| 281 | def __init__(self, connection, recv): | ||
| 282 | self.connection = connection | ||
| 283 | self.recv = recv | ||
| 198 | 284 | ||
| 199 | def runCommand(self, command): | 285 | def runCommand(self, command): |
| 200 | """ | ||
| 201 | Run a cooker command on the server | ||
| 202 | """ | ||
| 203 | self.command_channel.send(self.cooker.command.runCommand(command)) | ||
| 204 | 286 | ||
| 205 | def stop(self): | 287 | self.connection.send(command) |
| 206 | self.quitin.send("quit") | 288 | while True: |
| 207 | self.quitin.close() | 289 | # don't let the user ctrl-c while we're waiting for a response |
| 290 | try: | ||
| 291 | for idx in range(0,4): # 0, 1, 2, 3 | ||
| 292 | if self.recv.poll(1): | ||
| 293 | return self.recv.get() | ||
| 294 | else: | ||
| 295 | bb.warn("Timeout while attempting to communicate with bitbake server") | ||
| 296 | raise ProcessTimeout("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") | ||
| 297 | except KeyboardInterrupt: | ||
| 298 | pass | ||
| 299 | |||
| 300 | def updateFeatureSet(self, featureset): | ||
| 301 | _, error = self.runCommand(["setFeatures", featureset]) | ||
| 302 | if error: | ||
| 303 | logger.error("Unable to set the cooker to the correct featureset: %s" % error) | ||
| 304 | raise BaseException(error) | ||
| 305 | |||
| 306 | def getEventHandle(self): | ||
| 307 | handle, error = self.runCommand(["getUIHandlerNum"]) | ||
| 308 | if error: | ||
| 309 | logger.error("Unable to get UI Handler Number: %s" % error) | ||
| 310 | raise BaseException(error) | ||
| 208 | 311 | ||
| 209 | def addcooker(self, cooker): | 312 | return handle |
| 210 | self.cooker = cooker | ||
| 211 | 313 | ||
| 212 | def register_idle_function(self, function, data): | 314 | def terminateServer(self): |
| 213 | """Register a function to be called while the server is idle""" | 315 | self.connection.send(['terminateServer']) |
| 214 | assert hasattr(function, '__call__') | 316 | return |
| 215 | self._idlefuns[function] = data | ||
| 216 | 317 | ||
| 217 | class BitBakeProcessServerConnection(object): | 318 | class BitBakeProcessServerConnection(object): |
| 218 | def __init__(self, serverImpl, ui_channel, event_queue): | 319 | def __init__(self, ui_channel, recv, eq): |
| 219 | self.procserver = serverImpl | 320 | self.connection = ServerCommunicator(ui_channel, recv) |
| 220 | self.ui_channel = ui_channel | 321 | self.events = eq |
| 221 | self.event_queue = event_queue | ||
| 222 | self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver) | ||
| 223 | self.events = self.event_queue | ||
| 224 | self.terminated = False | ||
| 225 | |||
| 226 | def sigterm_terminate(self): | ||
| 227 | bb.error("UI received SIGTERM") | ||
| 228 | self.terminate() | ||
| 229 | 322 | ||
| 230 | def terminate(self): | 323 | def terminate(self): |
| 231 | if self.terminated: | 324 | self.socket_connection.close() |
| 232 | return | 325 | return |
| 233 | self.terminated = True | 326 | |
| 234 | def flushevents(): | 327 | class BitBakeServer(object): |
| 235 | while True: | 328 | def __init__(self, lock, sockname, configuration, featureset): |
| 236 | try: | 329 | |
| 237 | event = self.event_queue.get(block=False) | 330 | self.configuration = configuration |
| 238 | except (Empty, IOError): | 331 | self.featureset = featureset |
| 239 | break | 332 | self.sockname = sockname |
| 240 | if isinstance(event, logging.LogRecord): | 333 | self.bitbake_lock = lock |
| 241 | logger.handle(event) | 334 | |
| 242 | 335 | # Create server control socket | |
| 243 | self.procserver.stop() | 336 | if os.path.exists(sockname): |
| 244 | 337 | os.unlink(sockname) | |
| 245 | while self.procserver.is_alive(): | 338 | |
| 246 | flushevents() | 339 | self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 247 | self.procserver.join(0.1) | 340 | # AF_UNIX has path length issues so chdir here to workaround |
| 248 | 341 | cwd = os.getcwd() | |
| 249 | self.ui_channel.close() | ||
| 250 | self.event_queue.close() | ||
| 251 | self.event_queue.setexit() | ||
| 252 | # XXX: Call explicity close in _writer to avoid | ||
| 253 | # fd leakage because isn't called on Queue.close() | ||
| 254 | self.event_queue._writer.close() | ||
| 255 | |||
| 256 | def setupEventQueue(self): | ||
| 257 | pass | ||
| 258 | |||
| 259 | # Wrap Queue to provide API which isn't server implementation specific | ||
| 260 | class ProcessEventQueue(multiprocessing.queues.Queue): | ||
| 261 | def __init__(self, maxsize): | ||
| 262 | multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context()) | ||
| 263 | self.exit = False | ||
| 264 | bb.utils.set_process_name("ProcessEQueue") | ||
| 265 | |||
| 266 | def setexit(self): | ||
| 267 | self.exit = True | ||
| 268 | |||
| 269 | def waitEvent(self, timeout): | ||
| 270 | if self.exit: | ||
| 271 | return self.getEvent() | ||
| 272 | try: | 342 | try: |
| 273 | if not self.server.is_alive(): | 343 | os.chdir(os.path.dirname(sockname)) |
| 274 | return self.getEvent() | 344 | self.sock.bind(os.path.basename(sockname)) |
| 275 | if timeout == 0: | 345 | finally: |
| 276 | return self.get(False) | 346 | os.chdir(cwd) |
| 277 | return self.get(True, timeout) | 347 | self.sock.listen(1) |
| 278 | except Empty: | 348 | |
| 279 | return None | 349 | os.set_inheritable(self.sock.fileno(), True) |
| 350 | bb.daemonize.createDaemon(self._startServer, "bitbake-cookerdaemon.log") | ||
| 351 | self.sock.close() | ||
| 352 | self.bitbake_lock.close() | ||
| 353 | |||
| 354 | def _startServer(self): | ||
| 355 | server = ProcessServer(self.bitbake_lock, self.sock, self.sockname) | ||
| 356 | self.configuration.setServerRegIdleCallback(server.register_idle_function) | ||
| 357 | |||
| 358 | # Copy prefile and postfile to _server variants | ||
| 359 | for param in ('prefile', 'postfile'): | ||
| 360 | value = getattr(self.configuration, param) | ||
| 361 | if value: | ||
| 362 | setattr(self.configuration, "%s_server" % param, value) | ||
| 363 | |||
| 364 | self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset) | ||
| 365 | server.cooker = self.cooker | ||
| 366 | server.server_timeout = self.configuration.server_timeout | ||
| 367 | server.xmlrpcinterface = self.configuration.xmlrpcinterface | ||
| 368 | print("Started bitbake server pid %d" % os.getpid()) | ||
| 369 | server.start() | ||
| 370 | |||
| 371 | def connectProcessServer(sockname, featureset): | ||
| 372 | # Connect to socket | ||
| 373 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
| 374 | # AF_UNIX has path length issues so chdir here to workaround | ||
| 375 | cwd = os.getcwd() | ||
| 376 | |||
| 377 | try: | ||
| 378 | os.chdir(os.path.dirname(sockname)) | ||
| 379 | sock.connect(os.path.basename(sockname)) | ||
| 380 | finally: | ||
| 381 | os.chdir(cwd) | ||
| 382 | |||
| 383 | try: | ||
| 384 | # Send an fd for the remote to write events to | ||
| 385 | readfd, writefd = os.pipe() | ||
| 386 | eq = BBUIEventQueue(readfd) | ||
| 387 | # Send an fd for the remote to recieve commands from | ||
| 388 | readfd1, writefd1 = os.pipe() | ||
| 389 | command_chan = ConnectionWriter(writefd1) | ||
| 390 | # Send an fd for the remote to write commands results to | ||
| 391 | readfd2, writefd2 = os.pipe() | ||
| 392 | command_chan_recv = ConnectionReader(readfd2) | ||
| 393 | |||
| 394 | sendfds(sock, [writefd, readfd1, writefd2]) | ||
| 395 | |||
| 396 | server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq) | ||
| 397 | |||
| 398 | server_connection.connection.updateFeatureSet(featureset) | ||
| 399 | |||
| 400 | # Save sock so it doesn't get gc'd for the life of our connection | ||
| 401 | server_connection.socket_connection = sock | ||
| 402 | except: | ||
| 403 | sock.close() | ||
| 404 | raise | ||
| 405 | |||
| 406 | return server_connection | ||
| 407 | |||
| 408 | def sendfds(sock, fds): | ||
| 409 | '''Send an array of fds over an AF_UNIX socket.''' | ||
| 410 | fds = array.array('i', fds) | ||
| 411 | msg = bytes([len(fds) % 256]) | ||
| 412 | sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) | ||
| 413 | |||
| 414 | def recvfds(sock, size): | ||
| 415 | '''Receive an array of fds over an AF_UNIX socket.''' | ||
| 416 | a = array.array('i') | ||
| 417 | bytes_size = a.itemsize * size | ||
| 418 | msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) | ||
| 419 | if not msg and not ancdata: | ||
| 420 | raise EOFError | ||
| 421 | try: | ||
| 422 | if len(ancdata) != 1: | ||
| 423 | raise RuntimeError('received %d items of ancdata' % | ||
| 424 | len(ancdata)) | ||
| 425 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] | ||
| 426 | if (cmsg_level == socket.SOL_SOCKET and | ||
| 427 | cmsg_type == socket.SCM_RIGHTS): | ||
| 428 | if len(cmsg_data) % a.itemsize != 0: | ||
| 429 | raise ValueError | ||
| 430 | a.frombytes(cmsg_data) | ||
| 431 | assert len(a) % 256 == msg[0] | ||
| 432 | return list(a) | ||
| 433 | except (ValueError, IndexError): | ||
| 434 | pass | ||
| 435 | raise RuntimeError('Invalid data received') | ||
| 436 | |||
| 437 | class BBUIEventQueue: | ||
| 438 | def __init__(self, readfd): | ||
| 439 | |||
| 440 | self.eventQueue = [] | ||
| 441 | self.eventQueueLock = threading.Lock() | ||
| 442 | self.eventQueueNotify = threading.Event() | ||
| 443 | |||
| 444 | self.reader = ConnectionReader(readfd) | ||
| 445 | |||
| 446 | self.t = threading.Thread() | ||
| 447 | self.t.setDaemon(True) | ||
| 448 | self.t.run = self.startCallbackHandler | ||
| 449 | self.t.start() | ||
| 280 | 450 | ||
| 281 | def getEvent(self): | 451 | def getEvent(self): |
| 282 | try: | 452 | self.eventQueueLock.acquire() |
| 283 | if not self.server.is_alive(): | 453 | |
| 284 | self.setexit() | 454 | if len(self.eventQueue) == 0: |
| 285 | return self.get(False) | 455 | self.eventQueueLock.release() |
| 286 | except Empty: | ||
| 287 | if self.exit: | ||
| 288 | sys.exit(1) | ||
| 289 | return None | 456 | return None |
| 290 | 457 | ||
| 291 | class BitBakeServer(object): | 458 | item = self.eventQueue.pop(0) |
| 292 | def initServer(self, single_use=True): | ||
| 293 | # establish communication channels. We use bidirectional pipes for | ||
| 294 | # ui <--> server command/response pairs | ||
| 295 | # and a queue for server -> ui event notifications | ||
| 296 | # | ||
| 297 | self.ui_channel, self.server_channel = Pipe() | ||
| 298 | self.event_queue = ProcessEventQueue(0) | ||
| 299 | self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None) | ||
| 300 | self.event_queue.server = self.serverImpl | ||
| 301 | |||
| 302 | def detach(self): | ||
| 303 | self.serverImpl.start() | ||
| 304 | return | ||
| 305 | 459 | ||
| 306 | def establishConnection(self, featureset): | 460 | if len(self.eventQueue) == 0: |
| 461 | self.eventQueueNotify.clear() | ||
| 307 | 462 | ||
| 308 | self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue) | 463 | self.eventQueueLock.release() |
| 464 | return item | ||
| 309 | 465 | ||
| 310 | _, error = self.connection.connection.runCommand(["setFeatures", featureset]) | 466 | def waitEvent(self, delay): |
| 311 | if error: | 467 | self.eventQueueNotify.wait(delay) |
| 312 | logger.error("Unable to set the cooker to the correct featureset: %s" % error) | 468 | return self.getEvent() |
| 313 | raise BaseException(error) | ||
| 314 | signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate()) | ||
| 315 | return self.connection | ||
| 316 | 469 | ||
| 317 | def addcooker(self, cooker): | 470 | def queue_event(self, event): |
| 318 | self.cooker = cooker | 471 | self.eventQueueLock.acquire() |
| 319 | self.serverImpl.addcooker(cooker) | 472 | self.eventQueue.append(event) |
| 473 | self.eventQueueNotify.set() | ||
| 474 | self.eventQueueLock.release() | ||
| 320 | 475 | ||
| 321 | def getServerIdleCB(self): | 476 | def send_event(self, event): |
| 322 | return self.serverImpl.register_idle_function | 477 | self.queue_event(pickle.loads(event)) |
| 323 | 478 | ||
| 324 | def saveConnectionDetails(self): | 479 | def startCallbackHandler(self): |
| 325 | return | 480 | bb.utils.set_process_name("UIEventQueue") |
| 481 | while True: | ||
| 482 | self.reader.wait() | ||
| 483 | event = self.reader.get() | ||
| 484 | self.queue_event(event) | ||
| 485 | |||
| 486 | class ConnectionReader(object): | ||
| 487 | |||
| 488 | def __init__(self, fd): | ||
| 489 | self.reader = multiprocessing.connection.Connection(fd, writable=False) | ||
| 490 | self.rlock = multiprocessing.Lock() | ||
| 491 | |||
| 492 | def wait(self, timeout=None): | ||
| 493 | return multiprocessing.connection.wait([self.reader], timeout) | ||
| 494 | |||
| 495 | def poll(self, timeout=None): | ||
| 496 | return self.reader.poll(timeout) | ||
| 497 | |||
| 498 | def get(self): | ||
| 499 | with self.rlock: | ||
| 500 | res = self.reader.recv_bytes() | ||
| 501 | return multiprocessing.reduction.ForkingPickler.loads(res) | ||
| 502 | |||
| 503 | def fileno(self): | ||
| 504 | return self.reader.fileno() | ||
| 505 | |||
| 506 | class ConnectionWriter(object): | ||
| 507 | |||
| 508 | def __init__(self, fd): | ||
| 509 | self.writer = multiprocessing.connection.Connection(fd, readable=False) | ||
| 510 | self.wlock = multiprocessing.Lock() | ||
| 511 | # Why bb.event needs this I have no idea | ||
| 512 | self.event = self | ||
| 513 | |||
| 514 | def send(self, obj): | ||
| 515 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) | ||
| 516 | with self.wlock: | ||
| 517 | self.writer.send_bytes(obj) | ||
| 326 | 518 | ||
| 327 | def endSession(self): | ||
| 328 | self.connection.terminate() | ||
