diff options
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r-- | bitbake/lib/bb/server/process.py | 596 |
1 files changed, 393 insertions, 203 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py index 48da7fe46c..6edb0213ad 100644 --- a/bitbake/lib/bb/server/process.py +++ b/bitbake/lib/bb/server/process.py | |||
@@ -22,128 +22,205 @@ | |||
22 | 22 | ||
23 | import bb | 23 | import bb |
24 | import bb.event | 24 | import bb.event |
25 | import itertools | ||
26 | import logging | 25 | import logging |
27 | import multiprocessing | 26 | import multiprocessing |
27 | import threading | ||
28 | import array | ||
28 | import os | 29 | import os |
29 | import signal | ||
30 | import sys | 30 | import sys |
31 | import time | 31 | import time |
32 | import select | 32 | import select |
33 | from queue import Empty | 33 | import socket |
34 | from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager | 34 | import subprocess |
35 | import errno | ||
36 | import bb.server.xmlrpcserver | ||
37 | from bb import daemonize | ||
38 | from multiprocessing import queues | ||
35 | 39 | ||
36 | logger = logging.getLogger('BitBake') | 40 | logger = logging.getLogger('BitBake') |
37 | 41 | ||
38 | class ServerCommunicator(): | 42 | class ProcessTimeout(SystemExit): |
39 | def __init__(self, connection, event_handle, server): | 43 | pass |
40 | self.connection = connection | ||
41 | self.event_handle = event_handle | ||
42 | self.server = server | ||
43 | |||
44 | def runCommand(self, command): | ||
45 | # @todo try/except | ||
46 | self.connection.send(command) | ||
47 | |||
48 | if not self.server.is_alive(): | ||
49 | raise SystemExit | ||
50 | |||
51 | while True: | ||
52 | # don't let the user ctrl-c while we're waiting for a response | ||
53 | try: | ||
54 | for idx in range(0,4): # 0, 1, 2, 3 | ||
55 | if self.connection.poll(5): | ||
56 | return self.connection.recv() | ||
57 | else: | ||
58 | bb.warn("Timeout while attempting to communicate with bitbake server") | ||
59 | bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") | ||
60 | except KeyboardInterrupt: | ||
61 | pass | ||
62 | |||
63 | def getEventHandle(self): | ||
64 | handle, error = self.runCommand(["getUIHandlerNum"]) | ||
65 | if error: | ||
66 | logger.error("Unable to get UI Handler Number: %s" % error) | ||
67 | raise BaseException(error) | ||
68 | 44 | ||
69 | return handle | 45 | class ProcessServer(multiprocessing.Process): |
70 | |||
71 | class EventAdapter(): | ||
72 | """ | ||
73 | Adapter to wrap our event queue since the caller (bb.event) expects to | ||
74 | call a send() method, but our actual queue only has put() | ||
75 | """ | ||
76 | def __init__(self, queue): | ||
77 | self.queue = queue | ||
78 | |||
79 | def send(self, event): | ||
80 | try: | ||
81 | self.queue.put(event) | ||
82 | except Exception as err: | ||
83 | print("EventAdapter puked: %s" % str(err)) | ||
84 | |||
85 | |||
86 | class ProcessServer(Process): | ||
87 | profile_filename = "profile.log" | 46 | profile_filename = "profile.log" |
88 | profile_processed_filename = "profile.log.processed" | 47 | profile_processed_filename = "profile.log.processed" |
89 | 48 | ||
90 | def __init__(self, command_channel, event_queue, featurelist): | 49 | def __init__(self, lock, sock, sockname): |
91 | self._idlefuns = {} | 50 | multiprocessing.Process.__init__(self) |
92 | Process.__init__(self) | 51 | self.command_channel = False |
93 | self.command_channel = command_channel | 52 | self.command_channel_reply = False |
94 | self.event_queue = event_queue | ||
95 | self.event = EventAdapter(event_queue) | ||
96 | self.featurelist = featurelist | ||
97 | self.quit = False | 53 | self.quit = False |
98 | self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. | 54 | self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. |
99 | self.next_heartbeat = time.time() | 55 | self.next_heartbeat = time.time() |
100 | 56 | ||
101 | self.quitin, self.quitout = Pipe() | 57 | self.event_handle = None |
102 | self.event_handle = multiprocessing.Value("i") | 58 | self.haveui = False |
59 | self.lastui = False | ||
60 | self.xmlrpc = False | ||
61 | |||
62 | self._idlefuns = {} | ||
63 | |||
64 | self.bitbake_lock = lock | ||
65 | self.sock = sock | ||
66 | self.sockname = sockname | ||
67 | |||
68 | def register_idle_function(self, function, data): | ||
69 | """Register a function to be called while the server is idle""" | ||
70 | assert hasattr(function, '__call__') | ||
71 | self._idlefuns[function] = data | ||
103 | 72 | ||
104 | def run(self): | 73 | def run(self): |
105 | for event in bb.event.ui_queue: | 74 | |
106 | self.event_queue.put(event) | 75 | if self.xmlrpcinterface[0]: |
107 | self.event_handle.value = bb.event.register_UIHhandler(self, True) | 76 | self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) |
77 | |||
78 | print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) | ||
108 | 79 | ||
109 | heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') | 80 | heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') |
110 | if heartbeat_event: | 81 | if heartbeat_event: |
111 | try: | 82 | try: |
112 | self.heartbeat_seconds = float(heartbeat_event) | 83 | self.heartbeat_seconds = float(heartbeat_event) |
113 | except: | 84 | except: |
114 | # Throwing an exception here causes bitbake to hang. | ||
115 | # Just warn about the invalid setting and continue | ||
116 | bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) | 85 | bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) |
86 | |||
87 | self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') | ||
88 | try: | ||
89 | if self.timeout: | ||
90 | self.timeout = float(self.timeout) | ||
91 | except: | ||
92 | bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) | ||
93 | |||
94 | |||
95 | try: | ||
96 | self.bitbake_lock.seek(0) | ||
97 | self.bitbake_lock.truncate() | ||
98 | if self.xmlrpcinterface[0]: | ||
99 | self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), configuration.interface[0], configuration.interface[1])) | ||
100 | else: | ||
101 | self.bitbake_lock.write("%s\n" % (os.getpid())) | ||
102 | self.bitbake_lock.flush() | ||
103 | except: | ||
104 | pass | ||
105 | |||
117 | bb.cooker.server_main(self.cooker, self.main) | 106 | bb.cooker.server_main(self.cooker, self.main) |
118 | 107 | ||
119 | def main(self): | 108 | def main(self): |
120 | # Ignore SIGINT within the server, as all SIGINT handling is done by | ||
121 | # the UI and communicated to us | ||
122 | self.quitin.close() | ||
123 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
124 | bb.utils.set_process_name("Cooker") | 109 | bb.utils.set_process_name("Cooker") |
110 | |||
111 | ready = [] | ||
112 | |||
113 | self.controllersock = False | ||
114 | fds = [self.sock] | ||
115 | if self.xmlrpc: | ||
116 | fds.append(self.xmlrpc) | ||
125 | while not self.quit: | 117 | while not self.quit: |
126 | try: | 118 | if self.command_channel in ready: |
127 | if self.command_channel.poll(): | 119 | command = self.command_channel.get() |
128 | command = self.command_channel.recv() | 120 | if command[0] == "terminateServer": |
129 | self.runCommand(command) | ||
130 | if self.quitout.poll(): | ||
131 | self.quitout.recv() | ||
132 | self.quit = True | 121 | self.quit = True |
133 | try: | 122 | continue |
134 | self.runCommand(["stateForceShutdown"]) | 123 | try: |
135 | except: | 124 | print("Running command %s" % command) |
136 | pass | 125 | self.command_channel_reply.send(self.cooker.command.runCommand(command)) |
137 | 126 | except Exception as e: | |
138 | self.idle_commands(.1, [self.command_channel, self.quitout]) | 127 | logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e))) |
139 | except Exception: | 128 | |
140 | logger.exception('Running command %s', command) | 129 | if self.xmlrpc in ready: |
130 | self.xmlrpc.handle_requests() | ||
131 | if self.sock in ready: | ||
132 | self.controllersock, address = self.sock.accept() | ||
133 | if self.haveui: | ||
134 | print("Dropping connection attempt as we have a UI %s" % (str(ready))) | ||
135 | self.controllersock.close() | ||
136 | else: | ||
137 | print("Accepting %s" % (str(ready))) | ||
138 | fds.append(self.controllersock) | ||
139 | if self.controllersock in ready: | ||
140 | try: | ||
141 | print("Connecting Client") | ||
142 | ui_fds = recvfds(self.controllersock, 3) | ||
143 | |||
144 | # Where to write events to | ||
145 | writer = ConnectionWriter(ui_fds[0]) | ||
146 | self.event_handle = bb.event.register_UIHhandler(writer, True) | ||
147 | self.event_writer = writer | ||
148 | |||
149 | # Where to read commands from | ||
150 | reader = ConnectionReader(ui_fds[1]) | ||
151 | fds.append(reader) | ||
152 | self.command_channel = reader | ||
153 | |||
154 | # Where to send command return values to | ||
155 | writer = ConnectionWriter(ui_fds[2]) | ||
156 | self.command_channel_reply = writer | ||
157 | |||
158 | self.haveui = True | ||
159 | |||
160 | except EOFError: | ||
161 | print("Disconnecting Client") | ||
162 | fds.remove(self.controllersock) | ||
163 | fds.remove(self.command_channel) | ||
164 | bb.event.unregister_UIHhandler(self.event_handle, True) | ||
165 | self.command_channel_reply.writer.close() | ||
166 | self.event_writer.writer.close() | ||
167 | del self.event_writer | ||
168 | self.controllersock.close() | ||
169 | self.haveui = False | ||
170 | self.lastui = time.time() | ||
171 | self.cooker.clientComplete() | ||
172 | if self.timeout is None: | ||
173 | print("No timeout, exiting.") | ||
174 | self.quit = True | ||
175 | if not self.haveui and self.lastui and self.timeout and (self.lastui + self.timeout) < time.time(): | ||
176 | print("Server timeout, exiting.") | ||
177 | self.quit = True | ||
141 | 178 | ||
142 | self.event_queue.close() | 179 | ready = self.idle_commands(.1, fds) |
143 | bb.event.unregister_UIHhandler(self.event_handle.value, True) | 180 | |
144 | self.command_channel.close() | 181 | print("Exiting") |
145 | self.cooker.shutdown(True) | 182 | try: |
146 | self.quitout.close() | 183 | self.cooker.shutdown(True) |
184 | except: | ||
185 | pass | ||
186 | |||
187 | # Remove the socket file so we don't get any more connections to avoid races | ||
188 | os.unlink(self.sockname) | ||
189 | self.sock.close() | ||
190 | |||
191 | # Finally release the lockfile but warn about other processes holding it open | ||
192 | lock = self.bitbake_lock | ||
193 | lockfile = lock.name | ||
194 | lock.close() | ||
195 | lock = None | ||
196 | |||
197 | while not lock: | ||
198 | with bb.utils.timeout(3): | ||
199 | lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True) | ||
200 | if not lock: | ||
201 | # Some systems may not have lsof available | ||
202 | procs = None | ||
203 | try: | ||
204 | procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) | ||
205 | except OSError as e: | ||
206 | if e.errno != errno.ENOENT: | ||
207 | raise | ||
208 | if procs is None: | ||
209 | # Fall back to fuser if lsof is unavailable | ||
210 | try: | ||
211 | procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) | ||
212 | except OSError as e: | ||
213 | if e.errno != errno.ENOENT: | ||
214 | raise | ||
215 | |||
216 | msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" | ||
217 | if procs: | ||
218 | msg += ":\n%s" % str(procs) | ||
219 | print(msg) | ||
220 | return | ||
221 | # We hold the lock so we can remove the file (hide stale pid data) | ||
222 | bb.utils.remove(lockfile) | ||
223 | bb.utils.unlockfile(lock) | ||
147 | 224 | ||
148 | def idle_commands(self, delay, fds=None): | 225 | def idle_commands(self, delay, fds=None): |
149 | nextsleep = delay | 226 | nextsleep = delay |
@@ -189,140 +266,253 @@ class ProcessServer(Process): | |||
189 | nextsleep = self.next_heartbeat - now | 266 | nextsleep = self.next_heartbeat - now |
190 | 267 | ||
191 | if nextsleep is not None: | 268 | if nextsleep is not None: |
269 | if self.xmlrpc: | ||
270 | nextsleep = self.xmlrpc.get_timeout(nextsleep) | ||
192 | try: | 271 | try: |
193 | select.select(fds,[],[],nextsleep) | 272 | return select.select(fds,[],[],nextsleep)[0] |
194 | except InterruptedError: | 273 | except InterruptedError: |
195 | # ignore EINTR error, nextsleep only used for wait | 274 | # Ignore EINTR |
196 | # certain time | 275 | return [] |
197 | pass | 276 | else: |
277 | return [] | ||
278 | |||
279 | |||
280 | class ServerCommunicator(): | ||
281 | def __init__(self, connection, recv): | ||
282 | self.connection = connection | ||
283 | self.recv = recv | ||
198 | 284 | ||
199 | def runCommand(self, command): | 285 | def runCommand(self, command): |
200 | """ | ||
201 | Run a cooker command on the server | ||
202 | """ | ||
203 | self.command_channel.send(self.cooker.command.runCommand(command)) | ||
204 | 286 | ||
205 | def stop(self): | 287 | self.connection.send(command) |
206 | self.quitin.send("quit") | 288 | while True: |
207 | self.quitin.close() | 289 | # don't let the user ctrl-c while we're waiting for a response |
290 | try: | ||
291 | for idx in range(0,4): # 0, 1, 2, 3 | ||
292 | if self.recv.poll(1): | ||
293 | return self.recv.get() | ||
294 | else: | ||
295 | bb.warn("Timeout while attempting to communicate with bitbake server") | ||
296 | raise ProcessTimeout("Gave up; Too many tries: timeout while attempting to communicate with bitbake server") | ||
297 | except KeyboardInterrupt: | ||
298 | pass | ||
299 | |||
300 | def updateFeatureSet(self, featureset): | ||
301 | _, error = self.runCommand(["setFeatures", featureset]) | ||
302 | if error: | ||
303 | logger.error("Unable to set the cooker to the correct featureset: %s" % error) | ||
304 | raise BaseException(error) | ||
305 | |||
306 | def getEventHandle(self): | ||
307 | handle, error = self.runCommand(["getUIHandlerNum"]) | ||
308 | if error: | ||
309 | logger.error("Unable to get UI Handler Number: %s" % error) | ||
310 | raise BaseException(error) | ||
208 | 311 | ||
209 | def addcooker(self, cooker): | 312 | return handle |
210 | self.cooker = cooker | ||
211 | 313 | ||
212 | def register_idle_function(self, function, data): | 314 | def terminateServer(self): |
213 | """Register a function to be called while the server is idle""" | 315 | self.connection.send(['terminateServer']) |
214 | assert hasattr(function, '__call__') | 316 | return |
215 | self._idlefuns[function] = data | ||
216 | 317 | ||
217 | class BitBakeProcessServerConnection(object): | 318 | class BitBakeProcessServerConnection(object): |
218 | def __init__(self, serverImpl, ui_channel, event_queue): | 319 | def __init__(self, ui_channel, recv, eq): |
219 | self.procserver = serverImpl | 320 | self.connection = ServerCommunicator(ui_channel, recv) |
220 | self.ui_channel = ui_channel | 321 | self.events = eq |
221 | self.event_queue = event_queue | ||
222 | self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver) | ||
223 | self.events = self.event_queue | ||
224 | self.terminated = False | ||
225 | |||
226 | def sigterm_terminate(self): | ||
227 | bb.error("UI received SIGTERM") | ||
228 | self.terminate() | ||
229 | 322 | ||
230 | def terminate(self): | 323 | def terminate(self): |
231 | if self.terminated: | 324 | self.socket_connection.close() |
232 | return | 325 | return |
233 | self.terminated = True | 326 | |
234 | def flushevents(): | 327 | class BitBakeServer(object): |
235 | while True: | 328 | def __init__(self, lock, sockname, configuration, featureset): |
236 | try: | 329 | |
237 | event = self.event_queue.get(block=False) | 330 | self.configuration = configuration |
238 | except (Empty, IOError): | 331 | self.featureset = featureset |
239 | break | 332 | self.sockname = sockname |
240 | if isinstance(event, logging.LogRecord): | 333 | self.bitbake_lock = lock |
241 | logger.handle(event) | 334 | |
242 | 335 | # Create server control socket | |
243 | self.procserver.stop() | 336 | if os.path.exists(sockname): |
244 | 337 | os.unlink(sockname) | |
245 | while self.procserver.is_alive(): | 338 | |
246 | flushevents() | 339 | self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
247 | self.procserver.join(0.1) | 340 | # AF_UNIX has path length issues so chdir here to workaround |
248 | 341 | cwd = os.getcwd() | |
249 | self.ui_channel.close() | ||
250 | self.event_queue.close() | ||
251 | self.event_queue.setexit() | ||
252 | # XXX: Call explicity close in _writer to avoid | ||
253 | # fd leakage because isn't called on Queue.close() | ||
254 | self.event_queue._writer.close() | ||
255 | |||
256 | def setupEventQueue(self): | ||
257 | pass | ||
258 | |||
259 | # Wrap Queue to provide API which isn't server implementation specific | ||
260 | class ProcessEventQueue(multiprocessing.queues.Queue): | ||
261 | def __init__(self, maxsize): | ||
262 | multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context()) | ||
263 | self.exit = False | ||
264 | bb.utils.set_process_name("ProcessEQueue") | ||
265 | |||
266 | def setexit(self): | ||
267 | self.exit = True | ||
268 | |||
269 | def waitEvent(self, timeout): | ||
270 | if self.exit: | ||
271 | return self.getEvent() | ||
272 | try: | 342 | try: |
273 | if not self.server.is_alive(): | 343 | os.chdir(os.path.dirname(sockname)) |
274 | return self.getEvent() | 344 | self.sock.bind(os.path.basename(sockname)) |
275 | if timeout == 0: | 345 | finally: |
276 | return self.get(False) | 346 | os.chdir(cwd) |
277 | return self.get(True, timeout) | 347 | self.sock.listen(1) |
278 | except Empty: | 348 | |
279 | return None | 349 | os.set_inheritable(self.sock.fileno(), True) |
350 | bb.daemonize.createDaemon(self._startServer, "bitbake-cookerdaemon.log") | ||
351 | self.sock.close() | ||
352 | self.bitbake_lock.close() | ||
353 | |||
354 | def _startServer(self): | ||
355 | server = ProcessServer(self.bitbake_lock, self.sock, self.sockname) | ||
356 | self.configuration.setServerRegIdleCallback(server.register_idle_function) | ||
357 | |||
358 | # Copy prefile and postfile to _server variants | ||
359 | for param in ('prefile', 'postfile'): | ||
360 | value = getattr(self.configuration, param) | ||
361 | if value: | ||
362 | setattr(self.configuration, "%s_server" % param, value) | ||
363 | |||
364 | self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset) | ||
365 | server.cooker = self.cooker | ||
366 | server.server_timeout = self.configuration.server_timeout | ||
367 | server.xmlrpcinterface = self.configuration.xmlrpcinterface | ||
368 | print("Started bitbake server pid %d" % os.getpid()) | ||
369 | server.start() | ||
370 | |||
371 | def connectProcessServer(sockname, featureset): | ||
372 | # Connect to socket | ||
373 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
374 | # AF_UNIX has path length issues so chdir here to workaround | ||
375 | cwd = os.getcwd() | ||
376 | |||
377 | try: | ||
378 | os.chdir(os.path.dirname(sockname)) | ||
379 | sock.connect(os.path.basename(sockname)) | ||
380 | finally: | ||
381 | os.chdir(cwd) | ||
382 | |||
383 | try: | ||
384 | # Send an fd for the remote to write events to | ||
385 | readfd, writefd = os.pipe() | ||
386 | eq = BBUIEventQueue(readfd) | ||
387 | # Send an fd for the remote to recieve commands from | ||
388 | readfd1, writefd1 = os.pipe() | ||
389 | command_chan = ConnectionWriter(writefd1) | ||
390 | # Send an fd for the remote to write commands results to | ||
391 | readfd2, writefd2 = os.pipe() | ||
392 | command_chan_recv = ConnectionReader(readfd2) | ||
393 | |||
394 | sendfds(sock, [writefd, readfd1, writefd2]) | ||
395 | |||
396 | server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq) | ||
397 | |||
398 | server_connection.connection.updateFeatureSet(featureset) | ||
399 | |||
400 | # Save sock so it doesn't get gc'd for the life of our connection | ||
401 | server_connection.socket_connection = sock | ||
402 | except: | ||
403 | sock.close() | ||
404 | raise | ||
405 | |||
406 | return server_connection | ||
407 | |||
408 | def sendfds(sock, fds): | ||
409 | '''Send an array of fds over an AF_UNIX socket.''' | ||
410 | fds = array.array('i', fds) | ||
411 | msg = bytes([len(fds) % 256]) | ||
412 | sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) | ||
413 | |||
414 | def recvfds(sock, size): | ||
415 | '''Receive an array of fds over an AF_UNIX socket.''' | ||
416 | a = array.array('i') | ||
417 | bytes_size = a.itemsize * size | ||
418 | msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) | ||
419 | if not msg and not ancdata: | ||
420 | raise EOFError | ||
421 | try: | ||
422 | if len(ancdata) != 1: | ||
423 | raise RuntimeError('received %d items of ancdata' % | ||
424 | len(ancdata)) | ||
425 | cmsg_level, cmsg_type, cmsg_data = ancdata[0] | ||
426 | if (cmsg_level == socket.SOL_SOCKET and | ||
427 | cmsg_type == socket.SCM_RIGHTS): | ||
428 | if len(cmsg_data) % a.itemsize != 0: | ||
429 | raise ValueError | ||
430 | a.frombytes(cmsg_data) | ||
431 | assert len(a) % 256 == msg[0] | ||
432 | return list(a) | ||
433 | except (ValueError, IndexError): | ||
434 | pass | ||
435 | raise RuntimeError('Invalid data received') | ||
436 | |||
437 | class BBUIEventQueue: | ||
438 | def __init__(self, readfd): | ||
439 | |||
440 | self.eventQueue = [] | ||
441 | self.eventQueueLock = threading.Lock() | ||
442 | self.eventQueueNotify = threading.Event() | ||
443 | |||
444 | self.reader = ConnectionReader(readfd) | ||
445 | |||
446 | self.t = threading.Thread() | ||
447 | self.t.setDaemon(True) | ||
448 | self.t.run = self.startCallbackHandler | ||
449 | self.t.start() | ||
280 | 450 | ||
281 | def getEvent(self): | 451 | def getEvent(self): |
282 | try: | 452 | self.eventQueueLock.acquire() |
283 | if not self.server.is_alive(): | 453 | |
284 | self.setexit() | 454 | if len(self.eventQueue) == 0: |
285 | return self.get(False) | 455 | self.eventQueueLock.release() |
286 | except Empty: | ||
287 | if self.exit: | ||
288 | sys.exit(1) | ||
289 | return None | 456 | return None |
290 | 457 | ||
291 | class BitBakeServer(object): | 458 | item = self.eventQueue.pop(0) |
292 | def initServer(self, single_use=True): | ||
293 | # establish communication channels. We use bidirectional pipes for | ||
294 | # ui <--> server command/response pairs | ||
295 | # and a queue for server -> ui event notifications | ||
296 | # | ||
297 | self.ui_channel, self.server_channel = Pipe() | ||
298 | self.event_queue = ProcessEventQueue(0) | ||
299 | self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None) | ||
300 | self.event_queue.server = self.serverImpl | ||
301 | |||
302 | def detach(self): | ||
303 | self.serverImpl.start() | ||
304 | return | ||
305 | 459 | ||
306 | def establishConnection(self, featureset): | 460 | if len(self.eventQueue) == 0: |
461 | self.eventQueueNotify.clear() | ||
307 | 462 | ||
308 | self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue) | 463 | self.eventQueueLock.release() |
464 | return item | ||
309 | 465 | ||
310 | _, error = self.connection.connection.runCommand(["setFeatures", featureset]) | 466 | def waitEvent(self, delay): |
311 | if error: | 467 | self.eventQueueNotify.wait(delay) |
312 | logger.error("Unable to set the cooker to the correct featureset: %s" % error) | 468 | return self.getEvent() |
313 | raise BaseException(error) | ||
314 | signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate()) | ||
315 | return self.connection | ||
316 | 469 | ||
317 | def addcooker(self, cooker): | 470 | def queue_event(self, event): |
318 | self.cooker = cooker | 471 | self.eventQueueLock.acquire() |
319 | self.serverImpl.addcooker(cooker) | 472 | self.eventQueue.append(event) |
473 | self.eventQueueNotify.set() | ||
474 | self.eventQueueLock.release() | ||
320 | 475 | ||
321 | def getServerIdleCB(self): | 476 | def send_event(self, event): |
322 | return self.serverImpl.register_idle_function | 477 | self.queue_event(pickle.loads(event)) |
323 | 478 | ||
324 | def saveConnectionDetails(self): | 479 | def startCallbackHandler(self): |
325 | return | 480 | bb.utils.set_process_name("UIEventQueue") |
481 | while True: | ||
482 | self.reader.wait() | ||
483 | event = self.reader.get() | ||
484 | self.queue_event(event) | ||
485 | |||
486 | class ConnectionReader(object): | ||
487 | |||
488 | def __init__(self, fd): | ||
489 | self.reader = multiprocessing.connection.Connection(fd, writable=False) | ||
490 | self.rlock = multiprocessing.Lock() | ||
491 | |||
492 | def wait(self, timeout=None): | ||
493 | return multiprocessing.connection.wait([self.reader], timeout) | ||
494 | |||
495 | def poll(self, timeout=None): | ||
496 | return self.reader.poll(timeout) | ||
497 | |||
498 | def get(self): | ||
499 | with self.rlock: | ||
500 | res = self.reader.recv_bytes() | ||
501 | return multiprocessing.reduction.ForkingPickler.loads(res) | ||
502 | |||
503 | def fileno(self): | ||
504 | return self.reader.fileno() | ||
505 | |||
506 | class ConnectionWriter(object): | ||
507 | |||
508 | def __init__(self, fd): | ||
509 | self.writer = multiprocessing.connection.Connection(fd, readable=False) | ||
510 | self.wlock = multiprocessing.Lock() | ||
511 | # Why bb.event needs this I have no idea | ||
512 | self.event = self | ||
513 | |||
514 | def send(self, obj): | ||
515 | obj = multiprocessing.reduction.ForkingPickler.dumps(obj) | ||
516 | with self.wlock: | ||
517 | self.writer.send_bytes(obj) | ||
326 | 518 | ||
327 | def endSession(self): | ||
328 | self.connection.terminate() | ||