summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py854
1 files changed, 0 insertions, 854 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
deleted file mode 100644
index d0f73590cc..0000000000
--- a/bitbake/lib/bb/server/process.py
+++ /dev/null
@@ -1,854 +0,0 @@
1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10 This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16from bb import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import stat
32import bb.server.xmlrpcserver
33from bb import daemonize
34from multiprocessing import queues
35
36logger = logging.getLogger('BitBake')
37
38class ProcessTimeout(SystemExit):
39 pass
40
41def currenttime():
42 return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
44def serverlog(msg):
45 print(str(os.getpid()) + " " + currenttime() + " " + msg)
46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47 #sys.stdout.flush()
48
49#
50# When we have lockfile issues, try and find infomation about which process is
51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54 # Some systems may not have lsof available
55 procs = None
56 try:
57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58 except subprocess.CalledProcessError:
59 # File was deleted?
60 pass
61 except OSError as e:
62 if e.errno != errno.ENOENT:
63 raise
64 if procs is None:
65 # Fall back to fuser if lsof is unavailable
66 try:
67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68 except subprocess.CalledProcessError:
69 # File was deleted?
70 pass
71 except OSError as e:
72 if e.errno != errno.ENOENT:
73 raise
74 if procs:
75 return procs.decode("utf-8")
76 return None
77
78class idleFinish():
79 def __init__(self, msg):
80 self.msg = msg
81
82class ProcessServer():
83 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
84 self.command_channel = False
85 self.command_channel_reply = False
86 self.quit = False
87 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
88 self.next_heartbeat = time.time()
89
90 self.event_handle = None
91 self.hadanyui = False
92 self.haveui = False
93 self.maxuiwait = 30
94 self.xmlrpc = False
95
96 self.idle = None
97 # Need a lock for _idlefuns changes
98 self._idlefuns = {}
99 self._idlefuncsLock = threading.Lock()
100 self.idle_cond = threading.Condition(self._idlefuncsLock)
101
102 self.bitbake_lock = lock
103 self.bitbake_lock_name = lockname
104 self.sock = sock
105 self.sockname = sockname
106 # It is possible the directory may be renamed. Cache the inode of the socket file
107 # so we can tell if things changed.
108 self.sockinode = os.stat(self.sockname)[stat.ST_INO]
109
110 self.server_timeout = server_timeout
111 self.timeout = self.server_timeout
112 self.xmlrpcinterface = xmlrpcinterface
113
114 def register_idle_function(self, function, data):
115 """Register a function to be called while the server is idle"""
116 assert hasattr(function, '__call__')
117 with bb.utils.lock_timeout(self._idlefuncsLock):
118 self._idlefuns[function] = data
119 serverlog("Registering idle function %s" % str(function))
120
121 def run(self):
122
123 if self.xmlrpcinterface[0]:
124 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
125
126 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
127
128 try:
129 self.bitbake_lock.seek(0)
130 self.bitbake_lock.truncate()
131 if self.xmlrpc:
132 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
133 else:
134 self.bitbake_lock.write("%s\n" % (os.getpid()))
135 self.bitbake_lock.flush()
136 except Exception as e:
137 serverlog("Error writing to lock file: %s" % str(e))
138 pass
139
140 return bb.utils.profile_function("main" in self.cooker.configuration.profile, self.main, "profile-mainloop.log")
141
142 def _idle_check(self):
143 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
144
145 def wait_for_idle(self, timeout=30):
146 # Wait for the idle loop to have cleared
147 with bb.utils.lock_timeout(self._idlefuncsLock):
148 return self.idle_cond.wait_for(self._idle_check, timeout) is not False
149
150 def set_async_cmd(self, cmd):
151 with bb.utils.lock_timeout(self._idlefuncsLock):
152 ret = self.idle_cond.wait_for(self._idle_check, 30)
153 if ret is False:
154 return False
155 self.cooker.command.currentAsyncCommand = cmd
156 return True
157
158 def clear_async_cmd(self):
159 with bb.utils.lock_timeout(self._idlefuncsLock):
160 self.cooker.command.currentAsyncCommand = None
161 self.idle_cond.notify_all()
162
163 def get_async_cmd(self):
164 with bb.utils.lock_timeout(self._idlefuncsLock):
165 return self.cooker.command.currentAsyncCommand
166
167 def main(self):
168 self.cooker.pre_serve()
169
170 bb.utils.set_process_name("Cooker")
171
172 ready = []
173 newconnections = []
174
175 self.controllersock = False
176 fds = [self.sock]
177 if self.xmlrpc:
178 fds.append(self.xmlrpc)
179 seendata = False
180 serverlog("Entering server connection loop")
181 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
182
183 def disconnect_client(self, fds):
184 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
185 if self.controllersock:
186 fds.remove(self.controllersock)
187 self.controllersock.close()
188 self.controllersock = False
189 if self.haveui:
190 # Wait for the idle loop to have cleared (30s max)
191 if not self.wait_for_idle(30):
192 serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
193 self.quit = True
194 fds.remove(self.command_channel)
195 bb.event.unregister_UIHhandler(self.event_handle, True)
196 self.command_channel_reply.writer.close()
197 self.event_writer.writer.close()
198 self.command_channel.close()
199 self.command_channel = False
200 del self.event_writer
201 self.lastui = time.time()
202 self.cooker.clientComplete()
203 self.haveui = False
204 ready = select.select(fds,[],[],0)[0]
205 if newconnections and not self.quit:
206 serverlog("Starting new client")
207 conn = newconnections.pop(-1)
208 fds.append(conn)
209 self.controllersock = conn
210 elif not self.timeout and not ready:
211 serverlog("No timeout, exiting.")
212 self.quit = True
213
214 self.lastui = time.time()
215 while not self.quit:
216 if self.sock in ready:
217 while select.select([self.sock],[],[],0)[0]:
218 controllersock, address = self.sock.accept()
219 if self.controllersock:
220 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
221 newconnections.append(controllersock)
222 else:
223 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
224 self.controllersock = controllersock
225 fds.append(controllersock)
226 if self.controllersock in ready:
227 try:
228 serverlog("Processing Client")
229 ui_fds = recvfds(self.controllersock, 3)
230 serverlog("Connecting Client")
231
232 # Where to write events to
233 writer = ConnectionWriter(ui_fds[0])
234 self.event_handle = bb.event.register_UIHhandler(writer, True)
235 self.event_writer = writer
236
237 # Where to read commands from
238 reader = ConnectionReader(ui_fds[1])
239 fds.append(reader)
240 self.command_channel = reader
241
242 # Where to send command return values to
243 writer = ConnectionWriter(ui_fds[2])
244 self.command_channel_reply = writer
245
246 self.haveui = True
247 self.hadanyui = True
248
249 except (EOFError, OSError):
250 disconnect_client(self, fds)
251
252 if not self.timeout == -1.0 and not self.haveui and self.timeout and \
253 (self.lastui + self.timeout) < time.time():
254 serverlog("Server timeout, exiting.")
255 self.quit = True
256
257 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
258 # one. We have had issue with processes hanging indefinitely so timing out UI-less
259 # servers is useful.
260 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
261 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
262 self.quit = True
263
264 if self.command_channel in ready:
265 try:
266 command = self.command_channel.get()
267 except EOFError:
268 # Client connection shutting down
269 ready = []
270 disconnect_client(self, fds)
271 continue
272 if command[0] == "terminateServer":
273 self.quit = True
274 continue
275 try:
276 serverlog("Running command %s" % command)
277 reply = self.cooker.command.runCommand(command, self)
278 serverlog("Sending reply %s" % repr(reply))
279 self.command_channel_reply.send(reply)
280 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
281 except Exception as e:
282 stack = traceback.format_exc()
283 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
284 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
285
286 if self.xmlrpc in ready:
287 self.xmlrpc.handle_requests()
288
289 if not seendata and hasattr(self.cooker, "data"):
290 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
291 if heartbeat_event:
292 try:
293 self.heartbeat_seconds = float(heartbeat_event)
294 except:
295 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
296
297 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
298 try:
299 if self.timeout:
300 self.timeout = float(self.timeout)
301 except:
302 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
303 seendata = True
304
305 if not self.idle:
306 self.idle = threading.Thread(target=self.idle_thread)
307 self.idle.start()
308 elif self.idle and not self.idle.is_alive():
309 serverlog("Idle thread terminated, main thread exiting too")
310 bb.error("Idle thread terminated, main thread exiting too")
311 self.quit = True
312
313 nextsleep = 1.0
314 if self.xmlrpc:
315 nextsleep = self.xmlrpc.get_timeout(nextsleep)
316 try:
317 ready = select.select(fds,[],[],nextsleep)[0]
318 except InterruptedError:
319 # Ignore EINTR
320 ready = []
321
322 if self.idle:
323 self.idle.join()
324
325 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
326 # Remove the socket file so we don't get any more connections to avoid races
327 # The build directory could have been renamed so if the file isn't the one we created
328 # we shouldn't delete it.
329 try:
330 sockinode = os.stat(self.sockname)[stat.ST_INO]
331 if sockinode == self.sockinode:
332 os.unlink(self.sockname)
333 else:
334 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
335 except Exception as err:
336 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
337 self.sock.close()
338
339 try:
340 self.cooker.shutdown(True, idle=False)
341 self.cooker.notifier.stop()
342 self.cooker.confignotifier.stop()
343 except:
344 pass
345
346 self.cooker.post_serve()
347
348 if len(threading.enumerate()) != 1:
349 serverlog("More than one thread left?: " + str(threading.enumerate()))
350
351 # Flush logs before we release the lock
352 sys.stdout.flush()
353 sys.stderr.flush()
354
355 # Finally release the lockfile but warn about other processes holding it open
356 lock = self.bitbake_lock
357 lockfile = self.bitbake_lock_name
358
359 def get_lock_contents(lockfile):
360 try:
361 with open(lockfile, "r") as f:
362 return f.readlines()
363 except FileNotFoundError:
364 return None
365
366 lock.close()
367 lock = None
368
369 while not lock:
370 i = 0
371 lock = None
372 if not os.path.exists(os.path.basename(lockfile)):
373 serverlog("Lockfile directory gone, exiting.")
374 return
375
376 while not lock and i < 30:
377 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
378 if not lock:
379 newlockcontents = get_lock_contents(lockfile)
380 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
381 # A new server was started, the lockfile contents changed, we can exit
382 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
383 return
384 time.sleep(0.1)
385 i += 1
386 if lock:
387 # We hold the lock so we can remove the file (hide stale pid data)
388 # via unlockfile.
389 bb.utils.unlockfile(lock)
390 serverlog("Exiting as we could obtain the lock")
391 return
392
393 if not lock:
394 procs = get_lockfile_process_msg(lockfile)
395 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
396 if procs:
397 msg.append(":\n%s" % procs)
398 serverlog("".join(msg))
399
400 def idle_thread(self):
401 bb.utils.profile_function("idle" in self.cooker.configuration.profile, self.idle_thread_internal, "profile-idleloop.log")
402
403 def idle_thread_internal(self):
404 def remove_idle_func(function):
405 with bb.utils.lock_timeout(self._idlefuncsLock):
406 del self._idlefuns[function]
407 self.idle_cond.notify_all()
408
409 while not self.quit:
410 nextsleep = 1.0
411 fds = []
412
413 with bb.utils.lock_timeout(self._idlefuncsLock):
414 items = list(self._idlefuns.items())
415
416 for function, data in items:
417 try:
418 retval = function(self, data, False)
419 if isinstance(retval, idleFinish):
420 serverlog("Removing idle function %s at idleFinish" % str(function))
421 remove_idle_func(function)
422 self.cooker.command.finishAsyncCommand(retval.msg)
423 nextsleep = None
424 elif retval is False:
425 serverlog("Removing idle function %s" % str(function))
426 remove_idle_func(function)
427 nextsleep = None
428 elif retval is True:
429 nextsleep = None
430 elif isinstance(retval, float) and nextsleep:
431 if (retval < nextsleep):
432 nextsleep = retval
433 elif nextsleep is None:
434 continue
435 else:
436 fds = fds + retval
437 except SystemExit:
438 raise
439 except Exception as exc:
440 if not isinstance(exc, bb.BBHandledException):
441 logger.exception('Running idle function')
442 remove_idle_func(function)
443 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
444 self.quit = True
445
446 # Create new heartbeat event?
447 now = time.time()
448 if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat:
449 # We might have missed heartbeats. Just trigger once in
450 # that case and continue after the usual delay.
451 self.next_heartbeat += self.heartbeat_seconds
452 if self.next_heartbeat <= now:
453 self.next_heartbeat = now + self.heartbeat_seconds
454 if hasattr(self.cooker, "data"):
455 heartbeat = bb.event.HeartbeatEvent(now)
456 try:
457 bb.event.fire(heartbeat, self.cooker.data)
458 except Exception as exc:
459 if not isinstance(exc, bb.BBHandledException):
460 logger.exception('Running heartbeat function')
461 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
462 self.quit = True
463 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
464 # Shorten timeout so that we we wake up in time for
465 # the heartbeat.
466 nextsleep = self.next_heartbeat - now
467
468 if nextsleep is not None:
469 select.select(fds,[],[],nextsleep)[0]
470
471class ServerCommunicator():
472 def __init__(self, connection, recv):
473 self.connection = connection
474 self.recv = recv
475
476 def runCommand(self, command):
477 try:
478 self.connection.send(command)
479 except BrokenPipeError as e:
480 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
481 if not self.recv.poll(30):
482 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
483 if not self.recv.poll(30):
484 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
485 try:
486 ret, exc = self.recv.get()
487 except EOFError as e:
488 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
489 # Should probably turn all exceptions in exc back into exceptions?
490 # For now, at least handle BBHandledException
491 if exc and ("BBHandledException" in exc or "SystemExit" in exc):
492 raise bb.BBHandledException()
493 return ret, exc
494
495 def updateFeatureSet(self, featureset):
496 _, error = self.runCommand(["setFeatures", featureset])
497 if error:
498 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
499 raise BaseException(error)
500
501 def getEventHandle(self):
502 handle, error = self.runCommand(["getUIHandlerNum"])
503 if error:
504 logger.error("Unable to get UI Handler Number: %s" % error)
505 raise BaseException(error)
506
507 return handle
508
509 def terminateServer(self):
510 self.connection.send(['terminateServer'])
511 return
512
513class BitBakeProcessServerConnection(object):
514 def __init__(self, ui_channel, recv, eq, sock):
515 self.connection = ServerCommunicator(ui_channel, recv)
516 self.events = eq
517 # Save sock so it doesn't get gc'd for the life of our connection
518 self.socket_connection = sock
519
520 def terminate(self):
521 self.events.close()
522 self.socket_connection.close()
523 self.connection.connection.close()
524 self.connection.recv.close()
525 return
526
527start_log_format = '--- Starting bitbake server pid %s at %s ---'
528start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
529
530class BitBakeServer(object):
531
532 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
533
534 self.server_timeout = server_timeout
535 self.xmlrpcinterface = xmlrpcinterface
536 self.featureset = featureset
537 self.sockname = sockname
538 self.bitbake_lock = lock
539 self.profile = profile
540 self.readypipe, self.readypipein = os.pipe()
541
542 # Place the log in the builddirectory alongside the lock file
543 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
544 self.logfile = logfile
545
546 startdatetime = datetime.datetime.now()
547 bb.daemonize.createDaemon(self._startServer, logfile)
548 self.bitbake_lock.close()
549 os.close(self.readypipein)
550
551 ready = ConnectionReader(self.readypipe)
552 r = ready.poll(5)
553 if not r:
554 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
555 r = ready.poll(90)
556 if r:
557 try:
558 r = ready.get()
559 except EOFError:
560 # Trap the child exiting/closing the pipe and error out
561 r = None
562 if not r or r[0] != "r":
563 ready.close()
564 bb.error("Unable to start bitbake server (%s)" % str(r))
565 if os.path.exists(logfile):
566 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
567 started = False
568 lines = []
569 lastlines = []
570 with open(logfile, "r") as f:
571 for line in f:
572 if started:
573 lines.append(line)
574 else:
575 lastlines.append(line)
576 res = logstart_re.search(line.rstrip())
577 if res:
578 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
579 if ldatetime >= startdatetime:
580 started = True
581 lines.append(line)
582 if len(lastlines) > 60:
583 lastlines = lastlines[-60:]
584 if lines:
585 if len(lines) > 60:
586 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
587 else:
588 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
589 elif lastlines:
590 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
591 else:
592 bb.error("%s doesn't exist" % logfile)
593
594 raise SystemExit(1)
595
596 ready.close()
597
598 def _startServer(self):
599 os.close(self.readypipe)
600 os.set_inheritable(self.bitbake_lock.fileno(), True)
601 os.set_inheritable(self.readypipein, True)
602 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
603 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(list(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
604
605def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
606
607 import bb.cookerdata
608 import bb.cooker
609
610 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
611
612 try:
613 bitbake_lock = os.fdopen(lockfd, "w")
614
615 # Create server control socket
616 if os.path.exists(sockname):
617 serverlog("WARNING: removing existing socket file '%s'" % sockname)
618 os.unlink(sockname)
619
620 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
621 # AF_UNIX has path length issues so chdir here to workaround
622 cwd = os.getcwd()
623 try:
624 os.chdir(os.path.dirname(sockname))
625 sock.bind(os.path.basename(sockname))
626 finally:
627 os.chdir(cwd)
628 sock.listen(1)
629
630 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
631 writer = ConnectionWriter(readypipeinfd)
632 try:
633 featureset = []
634 cooker = bb.cooker.BBCooker(featureset, server)
635 cooker.configuration.profile = profile
636 except bb.BBHandledException:
637 return None
638 writer.send("r")
639 writer.close()
640 server.cooker = cooker
641 serverlog("Started bitbake server pid %d" % os.getpid())
642
643 server.run()
644 finally:
645 # Flush any messages/errors to the logfile before exit
646 sys.stdout.flush()
647 sys.stderr.flush()
648
649def connectProcessServer(sockname, featureset):
650 # Connect to socket
651 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
652 # AF_UNIX has path length issues so chdir here to workaround
653 cwd = os.getcwd()
654
655 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
656 eq = command_chan_recv = command_chan = None
657
658 sock.settimeout(10)
659
660 try:
661 try:
662 os.chdir(os.path.dirname(sockname))
663 finished = False
664 while not finished:
665 try:
666 sock.connect(os.path.basename(sockname))
667 finished = True
668 except IOError as e:
669 if e.errno == errno.EWOULDBLOCK:
670 pass
671 raise
672 finally:
673 os.chdir(cwd)
674
675 # Send an fd for the remote to write events to
676 readfd, writefd = os.pipe()
677 eq = BBUIEventQueue(readfd)
678 # Send an fd for the remote to recieve commands from
679 readfd1, writefd1 = os.pipe()
680 command_chan = ConnectionWriter(writefd1)
681 # Send an fd for the remote to write commands results to
682 readfd2, writefd2 = os.pipe()
683 command_chan_recv = ConnectionReader(readfd2)
684
685 sendfds(sock, [writefd, readfd1, writefd2])
686
687 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
688
689 # Close the ends of the pipes we won't use
690 for i in [writefd, readfd1, writefd2]:
691 os.close(i)
692
693 server_connection.connection.updateFeatureSet(featureset)
694
695 except (Exception, SystemExit) as e:
696 if command_chan_recv:
697 command_chan_recv.close()
698 if command_chan:
699 command_chan.close()
700 for i in [writefd, readfd1, writefd2]:
701 try:
702 if i:
703 os.close(i)
704 except OSError:
705 pass
706 sock.close()
707 raise
708
709 return server_connection
710
711def sendfds(sock, fds):
712 '''Send an array of fds over an AF_UNIX socket.'''
713 fds = array.array('i', fds)
714 msg = bytes([len(fds) % 256])
715 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
716
717def recvfds(sock, size):
718 '''Receive an array of fds over an AF_UNIX socket.'''
719 a = array.array('i')
720 bytes_size = a.itemsize * size
721 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
722 if not msg and not ancdata:
723 raise EOFError
724 try:
725 if len(ancdata) != 1:
726 raise RuntimeError('received %d items of ancdata' %
727 len(ancdata))
728 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
729 if (cmsg_level == socket.SOL_SOCKET and
730 cmsg_type == socket.SCM_RIGHTS):
731 if len(cmsg_data) % a.itemsize != 0:
732 raise ValueError
733 a.frombytes(cmsg_data)
734 assert len(a) % 256 == msg[0]
735 return list(a)
736 except (ValueError, IndexError):
737 pass
738 raise RuntimeError('Invalid data received')
739
740class BBUIEventQueue:
741 def __init__(self, readfd):
742
743 self.eventQueue = []
744 self.eventQueueLock = threading.Lock()
745 self.eventQueueNotify = threading.Event()
746
747 self.reader = ConnectionReader(readfd)
748
749 self.t = threading.Thread()
750 self.t.run = self.startCallbackHandler
751 self.t.start()
752
753 def getEvent(self):
754 with bb.utils.lock_timeout(self.eventQueueLock):
755 if len(self.eventQueue) == 0:
756 return None
757
758 item = self.eventQueue.pop(0)
759 if len(self.eventQueue) == 0:
760 self.eventQueueNotify.clear()
761
762 return item
763
764 def waitEvent(self, delay):
765 self.eventQueueNotify.wait(delay)
766 return self.getEvent()
767
768 def queue_event(self, event):
769 with bb.utils.lock_timeout(self.eventQueueLock):
770 self.eventQueue.append(event)
771 self.eventQueueNotify.set()
772
773 def send_event(self, event):
774 self.queue_event(pickle.loads(event))
775
776 def startCallbackHandler(self):
777 bb.utils.set_process_name("UIEventQueue")
778 while True:
779 try:
780 ready = self.reader.wait(0.25)
781 if ready:
782 event = self.reader.get()
783 self.queue_event(event)
784 except (EOFError, OSError, TypeError):
785 # Easiest way to exit is to close the file descriptor to cause an exit
786 break
787
788 def close(self):
789 self.reader.close()
790 self.t.join()
791
792class ConnectionReader(object):
793
794 def __init__(self, fd):
795 self.reader = multiprocessing.connection.Connection(fd, writable=False)
796 self.rlock = multiprocessing.Lock()
797
798 def wait(self, timeout=None):
799 return multiprocessing.connection.wait([self.reader], timeout)
800
801 def poll(self, timeout=None):
802 return self.reader.poll(timeout)
803
804 def get(self):
805 with bb.utils.lock_timeout(self.rlock):
806 res = self.reader.recv_bytes()
807 return multiprocessing.reduction.ForkingPickler.loads(res)
808
809 def fileno(self):
810 return self.reader.fileno()
811
812 def close(self):
813 return self.reader.close()
814
815
816class ConnectionWriter(object):
817
818 def __init__(self, fd):
819 self.writer = multiprocessing.connection.Connection(fd, readable=False)
820 self.wlock = multiprocessing.Lock()
821 # Why bb.event needs this I have no idea
822 self.event = self
823
824 def _send(self, obj):
825 gc.disable()
826 with bb.utils.lock_timeout(self.wlock):
827 self.writer.send_bytes(obj)
828 gc.enable()
829
830 def send(self, obj):
831 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
832 # See notes/code in CookerParser
833 # We must not terminate holding this lock else processes will hang.
834 # For SIGTERM, raising afterwards avoids this.
835 # For SIGINT, we don't want to have written partial data to the pipe.
836 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
837 process = multiprocessing.current_process()
838 if process and hasattr(process, "queue_signals"):
839 with bb.utils.lock_timeout(process.signal_threadlock):
840 process.queue_signals = True
841 self._send(obj)
842 process.queue_signals = False
843
844 while len(process.signal_received) > 0:
845 sig = process.signal_received.pop()
846 process.handle_sig(sig, None)
847 else:
848 self._send(obj)
849
850 def fileno(self):
851 return self.writer.fileno()
852
853 def close(self):
854 return self.writer.close()