summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py596
1 files changed, 393 insertions, 203 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index 48da7fe46c..6edb0213ad 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -22,128 +22,205 @@
22 22
23import bb 23import bb
24import bb.event 24import bb.event
25import itertools
26import logging 25import logging
27import multiprocessing 26import multiprocessing
27import threading
28import array
28import os 29import os
29import signal
30import sys 30import sys
31import time 31import time
32import select 32import select
33from queue import Empty 33import socket
34from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager 34import subprocess
35import errno
36import bb.server.xmlrpcserver
37from bb import daemonize
38from multiprocessing import queues
35 39
36logger = logging.getLogger('BitBake') 40logger = logging.getLogger('BitBake')
37 41
38class ServerCommunicator(): 42class ProcessTimeout(SystemExit):
39 def __init__(self, connection, event_handle, server): 43 pass
40 self.connection = connection
41 self.event_handle = event_handle
42 self.server = server
43
44 def runCommand(self, command):
45 # @todo try/except
46 self.connection.send(command)
47
48 if not self.server.is_alive():
49 raise SystemExit
50
51 while True:
52 # don't let the user ctrl-c while we're waiting for a response
53 try:
54 for idx in range(0,4): # 0, 1, 2, 3
55 if self.connection.poll(5):
56 return self.connection.recv()
57 else:
58 bb.warn("Timeout while attempting to communicate with bitbake server")
59 bb.fatal("Gave up; Too many tries: timeout while attempting to communicate with bitbake server")
60 except KeyboardInterrupt:
61 pass
62
63 def getEventHandle(self):
64 handle, error = self.runCommand(["getUIHandlerNum"])
65 if error:
66 logger.error("Unable to get UI Handler Number: %s" % error)
67 raise BaseException(error)
68 44
69 return handle 45class ProcessServer(multiprocessing.Process):
70
71class EventAdapter():
72 """
73 Adapter to wrap our event queue since the caller (bb.event) expects to
74 call a send() method, but our actual queue only has put()
75 """
76 def __init__(self, queue):
77 self.queue = queue
78
79 def send(self, event):
80 try:
81 self.queue.put(event)
82 except Exception as err:
83 print("EventAdapter puked: %s" % str(err))
84
85
86class ProcessServer(Process):
87 profile_filename = "profile.log" 46 profile_filename = "profile.log"
88 profile_processed_filename = "profile.log.processed" 47 profile_processed_filename = "profile.log.processed"
89 48
90 def __init__(self, command_channel, event_queue, featurelist): 49 def __init__(self, lock, sock, sockname):
91 self._idlefuns = {} 50 multiprocessing.Process.__init__(self)
92 Process.__init__(self) 51 self.command_channel = False
93 self.command_channel = command_channel 52 self.command_channel_reply = False
94 self.event_queue = event_queue
95 self.event = EventAdapter(event_queue)
96 self.featurelist = featurelist
97 self.quit = False 53 self.quit = False
98 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 54 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
99 self.next_heartbeat = time.time() 55 self.next_heartbeat = time.time()
100 56
101 self.quitin, self.quitout = Pipe() 57 self.event_handle = None
102 self.event_handle = multiprocessing.Value("i") 58 self.haveui = False
59 self.lastui = False
60 self.xmlrpc = False
61
62 self._idlefuns = {}
63
64 self.bitbake_lock = lock
65 self.sock = sock
66 self.sockname = sockname
67
68 def register_idle_function(self, function, data):
69 """Register a function to be called while the server is idle"""
70 assert hasattr(function, '__call__')
71 self._idlefuns[function] = data
103 72
104 def run(self): 73 def run(self):
105 for event in bb.event.ui_queue: 74
106 self.event_queue.put(event) 75 if self.xmlrpcinterface[0]:
107 self.event_handle.value = bb.event.register_UIHhandler(self, True) 76 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
77
78 print("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
108 79
109 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 80 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
110 if heartbeat_event: 81 if heartbeat_event:
111 try: 82 try:
112 self.heartbeat_seconds = float(heartbeat_event) 83 self.heartbeat_seconds = float(heartbeat_event)
113 except: 84 except:
114 # Throwing an exception here causes bitbake to hang.
115 # Just warn about the invalid setting and continue
116 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 85 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
86
87 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
88 try:
89 if self.timeout:
90 self.timeout = float(self.timeout)
91 except:
92 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
93
94
95 try:
96 self.bitbake_lock.seek(0)
97 self.bitbake_lock.truncate()
98 if self.xmlrpcinterface[0]:
99 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), configuration.interface[0], configuration.interface[1]))
100 else:
101 self.bitbake_lock.write("%s\n" % (os.getpid()))
102 self.bitbake_lock.flush()
103 except:
104 pass
105
117 bb.cooker.server_main(self.cooker, self.main) 106 bb.cooker.server_main(self.cooker, self.main)
118 107
119 def main(self): 108 def main(self):
120 # Ignore SIGINT within the server, as all SIGINT handling is done by
121 # the UI and communicated to us
122 self.quitin.close()
123 signal.signal(signal.SIGINT, signal.SIG_IGN)
124 bb.utils.set_process_name("Cooker") 109 bb.utils.set_process_name("Cooker")
110
111 ready = []
112
113 self.controllersock = False
114 fds = [self.sock]
115 if self.xmlrpc:
116 fds.append(self.xmlrpc)
125 while not self.quit: 117 while not self.quit:
126 try: 118 if self.command_channel in ready:
127 if self.command_channel.poll(): 119 command = self.command_channel.get()
128 command = self.command_channel.recv() 120 if command[0] == "terminateServer":
129 self.runCommand(command)
130 if self.quitout.poll():
131 self.quitout.recv()
132 self.quit = True 121 self.quit = True
133 try: 122 continue
134 self.runCommand(["stateForceShutdown"]) 123 try:
135 except: 124 print("Running command %s" % command)
136 pass 125 self.command_channel_reply.send(self.cooker.command.runCommand(command))
137 126 except Exception as e:
138 self.idle_commands(.1, [self.command_channel, self.quitout]) 127 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
139 except Exception: 128
140 logger.exception('Running command %s', command) 129 if self.xmlrpc in ready:
130 self.xmlrpc.handle_requests()
131 if self.sock in ready:
132 self.controllersock, address = self.sock.accept()
133 if self.haveui:
134 print("Dropping connection attempt as we have a UI %s" % (str(ready)))
135 self.controllersock.close()
136 else:
137 print("Accepting %s" % (str(ready)))
138 fds.append(self.controllersock)
139 if self.controllersock in ready:
140 try:
141 print("Connecting Client")
142 ui_fds = recvfds(self.controllersock, 3)
143
144 # Where to write events to
145 writer = ConnectionWriter(ui_fds[0])
146 self.event_handle = bb.event.register_UIHhandler(writer, True)
147 self.event_writer = writer
148
149 # Where to read commands from
150 reader = ConnectionReader(ui_fds[1])
151 fds.append(reader)
152 self.command_channel = reader
153
154 # Where to send command return values to
155 writer = ConnectionWriter(ui_fds[2])
156 self.command_channel_reply = writer
157
158 self.haveui = True
159
160 except EOFError:
161 print("Disconnecting Client")
162 fds.remove(self.controllersock)
163 fds.remove(self.command_channel)
164 bb.event.unregister_UIHhandler(self.event_handle, True)
165 self.command_channel_reply.writer.close()
166 self.event_writer.writer.close()
167 del self.event_writer
168 self.controllersock.close()
169 self.haveui = False
170 self.lastui = time.time()
171 self.cooker.clientComplete()
172 if self.timeout is None:
173 print("No timeout, exiting.")
174 self.quit = True
175 if not self.haveui and self.lastui and self.timeout and (self.lastui + self.timeout) < time.time():
176 print("Server timeout, exiting.")
177 self.quit = True
141 178
142 self.event_queue.close() 179 ready = self.idle_commands(.1, fds)
143 bb.event.unregister_UIHhandler(self.event_handle.value, True) 180
144 self.command_channel.close() 181 print("Exiting")
145 self.cooker.shutdown(True) 182 try:
146 self.quitout.close() 183 self.cooker.shutdown(True)
184 except:
185 pass
186
187 # Remove the socket file so we don't get any more connections to avoid races
188 os.unlink(self.sockname)
189 self.sock.close()
190
191 # Finally release the lockfile but warn about other processes holding it open
192 lock = self.bitbake_lock
193 lockfile = lock.name
194 lock.close()
195 lock = None
196
197 while not lock:
198 with bb.utils.timeout(3):
199 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=True)
200 if not lock:
201 # Some systems may not have lsof available
202 procs = None
203 try:
204 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
205 except OSError as e:
206 if e.errno != errno.ENOENT:
207 raise
208 if procs is None:
209 # Fall back to fuser if lsof is unavailable
210 try:
211 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
212 except OSError as e:
213 if e.errno != errno.ENOENT:
214 raise
215
216 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
217 if procs:
218 msg += ":\n%s" % str(procs)
219 print(msg)
220 return
221 # We hold the lock so we can remove the file (hide stale pid data)
222 bb.utils.remove(lockfile)
223 bb.utils.unlockfile(lock)
147 224
148 def idle_commands(self, delay, fds=None): 225 def idle_commands(self, delay, fds=None):
149 nextsleep = delay 226 nextsleep = delay
@@ -189,140 +266,253 @@ class ProcessServer(Process):
189 nextsleep = self.next_heartbeat - now 266 nextsleep = self.next_heartbeat - now
190 267
191 if nextsleep is not None: 268 if nextsleep is not None:
269 if self.xmlrpc:
270 nextsleep = self.xmlrpc.get_timeout(nextsleep)
192 try: 271 try:
193 select.select(fds,[],[],nextsleep) 272 return select.select(fds,[],[],nextsleep)[0]
194 except InterruptedError: 273 except InterruptedError:
195 # ignore EINTR error, nextsleep only used for wait 274 # Ignore EINTR
196 # certain time 275 return []
197 pass 276 else:
277 return []
278
279
280class ServerCommunicator():
281 def __init__(self, connection, recv):
282 self.connection = connection
283 self.recv = recv
198 284
199 def runCommand(self, command): 285 def runCommand(self, command):
200 """
201 Run a cooker command on the server
202 """
203 self.command_channel.send(self.cooker.command.runCommand(command))
204 286
205 def stop(self): 287 self.connection.send(command)
206 self.quitin.send("quit") 288 while True:
207 self.quitin.close() 289 # don't let the user ctrl-c while we're waiting for a response
290 try:
291 for idx in range(0,4): # 0, 1, 2, 3
292 if self.recv.poll(1):
293 return self.recv.get()
294 else:
295 bb.warn("Timeout while attempting to communicate with bitbake server")
296 raise ProcessTimeout("Gave up; Too many tries: timeout while attempting to communicate with bitbake server")
297 except KeyboardInterrupt:
298 pass
299
300 def updateFeatureSet(self, featureset):
301 _, error = self.runCommand(["setFeatures", featureset])
302 if error:
303 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
304 raise BaseException(error)
305
306 def getEventHandle(self):
307 handle, error = self.runCommand(["getUIHandlerNum"])
308 if error:
309 logger.error("Unable to get UI Handler Number: %s" % error)
310 raise BaseException(error)
208 311
209 def addcooker(self, cooker): 312 return handle
210 self.cooker = cooker
211 313
212 def register_idle_function(self, function, data): 314 def terminateServer(self):
213 """Register a function to be called while the server is idle""" 315 self.connection.send(['terminateServer'])
214 assert hasattr(function, '__call__') 316 return
215 self._idlefuns[function] = data
216 317
217class BitBakeProcessServerConnection(object): 318class BitBakeProcessServerConnection(object):
218 def __init__(self, serverImpl, ui_channel, event_queue): 319 def __init__(self, ui_channel, recv, eq):
219 self.procserver = serverImpl 320 self.connection = ServerCommunicator(ui_channel, recv)
220 self.ui_channel = ui_channel 321 self.events = eq
221 self.event_queue = event_queue
222 self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver)
223 self.events = self.event_queue
224 self.terminated = False
225
226 def sigterm_terminate(self):
227 bb.error("UI received SIGTERM")
228 self.terminate()
229 322
230 def terminate(self): 323 def terminate(self):
231 if self.terminated: 324 self.socket_connection.close()
232 return 325 return
233 self.terminated = True 326
234 def flushevents(): 327class BitBakeServer(object):
235 while True: 328 def __init__(self, lock, sockname, configuration, featureset):
236 try: 329
237 event = self.event_queue.get(block=False) 330 self.configuration = configuration
238 except (Empty, IOError): 331 self.featureset = featureset
239 break 332 self.sockname = sockname
240 if isinstance(event, logging.LogRecord): 333 self.bitbake_lock = lock
241 logger.handle(event) 334
242 335 # Create server control socket
243 self.procserver.stop() 336 if os.path.exists(sockname):
244 337 os.unlink(sockname)
245 while self.procserver.is_alive(): 338
246 flushevents() 339 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
247 self.procserver.join(0.1) 340 # AF_UNIX has path length issues so chdir here to workaround
248 341 cwd = os.getcwd()
249 self.ui_channel.close()
250 self.event_queue.close()
251 self.event_queue.setexit()
252 # XXX: Call explicity close in _writer to avoid
253 # fd leakage because isn't called on Queue.close()
254 self.event_queue._writer.close()
255
256 def setupEventQueue(self):
257 pass
258
259# Wrap Queue to provide API which isn't server implementation specific
260class ProcessEventQueue(multiprocessing.queues.Queue):
261 def __init__(self, maxsize):
262 multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context())
263 self.exit = False
264 bb.utils.set_process_name("ProcessEQueue")
265
266 def setexit(self):
267 self.exit = True
268
269 def waitEvent(self, timeout):
270 if self.exit:
271 return self.getEvent()
272 try: 342 try:
273 if not self.server.is_alive(): 343 os.chdir(os.path.dirname(sockname))
274 return self.getEvent() 344 self.sock.bind(os.path.basename(sockname))
275 if timeout == 0: 345 finally:
276 return self.get(False) 346 os.chdir(cwd)
277 return self.get(True, timeout) 347 self.sock.listen(1)
278 except Empty: 348
279 return None 349 os.set_inheritable(self.sock.fileno(), True)
350 bb.daemonize.createDaemon(self._startServer, "bitbake-cookerdaemon.log")
351 self.sock.close()
352 self.bitbake_lock.close()
353
354 def _startServer(self):
355 server = ProcessServer(self.bitbake_lock, self.sock, self.sockname)
356 self.configuration.setServerRegIdleCallback(server.register_idle_function)
357
358 # Copy prefile and postfile to _server variants
359 for param in ('prefile', 'postfile'):
360 value = getattr(self.configuration, param)
361 if value:
362 setattr(self.configuration, "%s_server" % param, value)
363
364 self.cooker = bb.cooker.BBCooker(self.configuration, self.featureset)
365 server.cooker = self.cooker
366 server.server_timeout = self.configuration.server_timeout
367 server.xmlrpcinterface = self.configuration.xmlrpcinterface
368 print("Started bitbake server pid %d" % os.getpid())
369 server.start()
370
371def connectProcessServer(sockname, featureset):
372 # Connect to socket
373 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
374 # AF_UNIX has path length issues so chdir here to workaround
375 cwd = os.getcwd()
376
377 try:
378 os.chdir(os.path.dirname(sockname))
379 sock.connect(os.path.basename(sockname))
380 finally:
381 os.chdir(cwd)
382
383 try:
384 # Send an fd for the remote to write events to
385 readfd, writefd = os.pipe()
386 eq = BBUIEventQueue(readfd)
387 # Send an fd for the remote to recieve commands from
388 readfd1, writefd1 = os.pipe()
389 command_chan = ConnectionWriter(writefd1)
390 # Send an fd for the remote to write commands results to
391 readfd2, writefd2 = os.pipe()
392 command_chan_recv = ConnectionReader(readfd2)
393
394 sendfds(sock, [writefd, readfd1, writefd2])
395
396 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq)
397
398 server_connection.connection.updateFeatureSet(featureset)
399
400 # Save sock so it doesn't get gc'd for the life of our connection
401 server_connection.socket_connection = sock
402 except:
403 sock.close()
404 raise
405
406 return server_connection
407
408def sendfds(sock, fds):
409 '''Send an array of fds over an AF_UNIX socket.'''
410 fds = array.array('i', fds)
411 msg = bytes([len(fds) % 256])
412 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
413
414def recvfds(sock, size):
415 '''Receive an array of fds over an AF_UNIX socket.'''
416 a = array.array('i')
417 bytes_size = a.itemsize * size
418 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
419 if not msg and not ancdata:
420 raise EOFError
421 try:
422 if len(ancdata) != 1:
423 raise RuntimeError('received %d items of ancdata' %
424 len(ancdata))
425 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
426 if (cmsg_level == socket.SOL_SOCKET and
427 cmsg_type == socket.SCM_RIGHTS):
428 if len(cmsg_data) % a.itemsize != 0:
429 raise ValueError
430 a.frombytes(cmsg_data)
431 assert len(a) % 256 == msg[0]
432 return list(a)
433 except (ValueError, IndexError):
434 pass
435 raise RuntimeError('Invalid data received')
436
437class BBUIEventQueue:
438 def __init__(self, readfd):
439
440 self.eventQueue = []
441 self.eventQueueLock = threading.Lock()
442 self.eventQueueNotify = threading.Event()
443
444 self.reader = ConnectionReader(readfd)
445
446 self.t = threading.Thread()
447 self.t.setDaemon(True)
448 self.t.run = self.startCallbackHandler
449 self.t.start()
280 450
281 def getEvent(self): 451 def getEvent(self):
282 try: 452 self.eventQueueLock.acquire()
283 if not self.server.is_alive(): 453
284 self.setexit() 454 if len(self.eventQueue) == 0:
285 return self.get(False) 455 self.eventQueueLock.release()
286 except Empty:
287 if self.exit:
288 sys.exit(1)
289 return None 456 return None
290 457
291class BitBakeServer(object): 458 item = self.eventQueue.pop(0)
292 def initServer(self, single_use=True):
293 # establish communication channels. We use bidirectional pipes for
294 # ui <--> server command/response pairs
295 # and a queue for server -> ui event notifications
296 #
297 self.ui_channel, self.server_channel = Pipe()
298 self.event_queue = ProcessEventQueue(0)
299 self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None)
300 self.event_queue.server = self.serverImpl
301
302 def detach(self):
303 self.serverImpl.start()
304 return
305 459
306 def establishConnection(self, featureset): 460 if len(self.eventQueue) == 0:
461 self.eventQueueNotify.clear()
307 462
308 self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue) 463 self.eventQueueLock.release()
464 return item
309 465
310 _, error = self.connection.connection.runCommand(["setFeatures", featureset]) 466 def waitEvent(self, delay):
311 if error: 467 self.eventQueueNotify.wait(delay)
312 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 468 return self.getEvent()
313 raise BaseException(error)
314 signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate())
315 return self.connection
316 469
317 def addcooker(self, cooker): 470 def queue_event(self, event):
318 self.cooker = cooker 471 self.eventQueueLock.acquire()
319 self.serverImpl.addcooker(cooker) 472 self.eventQueue.append(event)
473 self.eventQueueNotify.set()
474 self.eventQueueLock.release()
320 475
321 def getServerIdleCB(self): 476 def send_event(self, event):
322 return self.serverImpl.register_idle_function 477 self.queue_event(pickle.loads(event))
323 478
324 def saveConnectionDetails(self): 479 def startCallbackHandler(self):
325 return 480 bb.utils.set_process_name("UIEventQueue")
481 while True:
482 self.reader.wait()
483 event = self.reader.get()
484 self.queue_event(event)
485
486class ConnectionReader(object):
487
488 def __init__(self, fd):
489 self.reader = multiprocessing.connection.Connection(fd, writable=False)
490 self.rlock = multiprocessing.Lock()
491
492 def wait(self, timeout=None):
493 return multiprocessing.connection.wait([self.reader], timeout)
494
495 def poll(self, timeout=None):
496 return self.reader.poll(timeout)
497
498 def get(self):
499 with self.rlock:
500 res = self.reader.recv_bytes()
501 return multiprocessing.reduction.ForkingPickler.loads(res)
502
503 def fileno(self):
504 return self.reader.fileno()
505
506class ConnectionWriter(object):
507
508 def __init__(self, fd):
509 self.writer = multiprocessing.connection.Connection(fd, readable=False)
510 self.wlock = multiprocessing.Lock()
511 # Why bb.event needs this I have no idea
512 self.event = self
513
514 def send(self, obj):
515 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
516 with self.wlock:
517 self.writer.send_bytes(obj)
326 518
327 def endSession(self):
328 self.connection.terminate()