summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/server/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/server/process.py')
-rw-r--r--bitbake/lib/bb/server/process.py742
1 files changed, 0 insertions, 742 deletions
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
deleted file mode 100644
index b27b4aefe0..0000000000
--- a/bitbake/lib/bb/server/process.py
+++ /dev/null
@@ -1,742 +0,0 @@
1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10 This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import bb.server.xmlrpcserver
30from bb import daemonize
31from multiprocessing import queues
32
33logger = logging.getLogger('BitBake')
34
35class ProcessTimeout(SystemExit):
36 pass
37
38def serverlog(msg):
39 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
40 sys.stdout.flush()
41
42class ProcessServer():
43 profile_filename = "profile.log"
44 profile_processed_filename = "profile.log.processed"
45
46 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
47 self.command_channel = False
48 self.command_channel_reply = False
49 self.quit = False
50 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
51 self.next_heartbeat = time.time()
52
53 self.event_handle = None
54 self.hadanyui = False
55 self.haveui = False
56 self.maxuiwait = 30
57 self.xmlrpc = False
58
59 self._idlefuns = {}
60
61 self.bitbake_lock = lock
62 self.bitbake_lock_name = lockname
63 self.sock = sock
64 self.sockname = sockname
65
66 self.server_timeout = server_timeout
67 self.timeout = self.server_timeout
68 self.xmlrpcinterface = xmlrpcinterface
69
70 def register_idle_function(self, function, data):
71 """Register a function to be called while the server is idle"""
72 assert hasattr(function, '__call__')
73 self._idlefuns[function] = data
74
75 def run(self):
76
77 if self.xmlrpcinterface[0]:
78 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
79
80 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
81
82 try:
83 self.bitbake_lock.seek(0)
84 self.bitbake_lock.truncate()
85 if self.xmlrpc:
86 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
87 else:
88 self.bitbake_lock.write("%s\n" % (os.getpid()))
89 self.bitbake_lock.flush()
90 except Exception as e:
91 serverlog("Error writing to lock file: %s" % str(e))
92 pass
93
94 if self.cooker.configuration.profile:
95 try:
96 import cProfile as profile
97 except:
98 import profile
99 prof = profile.Profile()
100
101 ret = profile.Profile.runcall(prof, self.main)
102
103 prof.dump_stats("profile.log")
104 bb.utils.process_profilelog("profile.log")
105 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
106
107 else:
108 ret = self.main()
109
110 return ret
111
112 def main(self):
113 self.cooker.pre_serve()
114
115 bb.utils.set_process_name("Cooker")
116
117 ready = []
118 newconnections = []
119
120 self.controllersock = False
121 fds = [self.sock]
122 if self.xmlrpc:
123 fds.append(self.xmlrpc)
124 seendata = False
125 serverlog("Entering server connection loop")
126
127 def disconnect_client(self, fds):
128 serverlog("Disconnecting Client")
129 if self.controllersock:
130 fds.remove(self.controllersock)
131 self.controllersock.close()
132 self.controllersock = False
133 if self.haveui:
134 fds.remove(self.command_channel)
135 bb.event.unregister_UIHhandler(self.event_handle, True)
136 self.command_channel_reply.writer.close()
137 self.event_writer.writer.close()
138 self.command_channel.close()
139 self.command_channel = False
140 del self.event_writer
141 self.lastui = time.time()
142 self.cooker.clientComplete()
143 self.haveui = False
144 ready = select.select(fds,[],[],0)[0]
145 if newconnections:
146 serverlog("Starting new client")
147 conn = newconnections.pop(-1)
148 fds.append(conn)
149 self.controllersock = conn
150 elif self.timeout is None and not ready:
151 serverlog("No timeout, exiting.")
152 self.quit = True
153
154 self.lastui = time.time()
155 while not self.quit:
156 if self.sock in ready:
157 while select.select([self.sock],[],[],0)[0]:
158 controllersock, address = self.sock.accept()
159 if self.controllersock:
160 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
161 newconnections.append(controllersock)
162 else:
163 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
164 self.controllersock = controllersock
165 fds.append(controllersock)
166 if self.controllersock in ready:
167 try:
168 serverlog("Processing Client")
169 ui_fds = recvfds(self.controllersock, 3)
170 serverlog("Connecting Client")
171
172 # Where to write events to
173 writer = ConnectionWriter(ui_fds[0])
174 self.event_handle = bb.event.register_UIHhandler(writer, True)
175 self.event_writer = writer
176
177 # Where to read commands from
178 reader = ConnectionReader(ui_fds[1])
179 fds.append(reader)
180 self.command_channel = reader
181
182 # Where to send command return values to
183 writer = ConnectionWriter(ui_fds[2])
184 self.command_channel_reply = writer
185
186 self.haveui = True
187 self.hadanyui = True
188
189 except (EOFError, OSError):
190 disconnect_client(self, fds)
191
192 if not self.timeout == -1.0 and not self.haveui and self.timeout and \
193 (self.lastui + self.timeout) < time.time():
194 serverlog("Server timeout, exiting.")
195 self.quit = True
196
197 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
198 # one. We have had issue with processes hanging indefinitely so timing out UI-less
199 # servers is useful.
200 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
201 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
202 self.quit = True
203
204 if self.command_channel in ready:
205 try:
206 command = self.command_channel.get()
207 except EOFError:
208 # Client connection shutting down
209 ready = []
210 disconnect_client(self, fds)
211 continue
212 if command[0] == "terminateServer":
213 self.quit = True
214 continue
215 try:
216 serverlog("Running command %s" % command)
217 self.command_channel_reply.send(self.cooker.command.runCommand(command))
218 serverlog("Command Completed")
219 except Exception as e:
220 serverlog('Exception in server main event loop running command %s (%s)' % (command, str(e)))
221 logger.exception('Exception in server main event loop running command %s (%s)' % (command, str(e)))
222
223 if self.xmlrpc in ready:
224 self.xmlrpc.handle_requests()
225
226 if not seendata and hasattr(self.cooker, "data"):
227 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
228 if heartbeat_event:
229 try:
230 self.heartbeat_seconds = float(heartbeat_event)
231 except:
232 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
233
234 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
235 try:
236 if self.timeout:
237 self.timeout = float(self.timeout)
238 except:
239 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
240 seendata = True
241
242 ready = self.idle_commands(.1, fds)
243
244 if len(threading.enumerate()) != 1:
245 serverlog("More than one thread left?: " + str(threading.enumerate()))
246
247 serverlog("Exiting")
248 # Remove the socket file so we don't get any more connections to avoid races
249 try:
250 os.unlink(self.sockname)
251 except:
252 pass
253 self.sock.close()
254
255 try:
256 self.cooker.shutdown(True)
257 self.cooker.notifier.stop()
258 self.cooker.confignotifier.stop()
259 except:
260 pass
261
262 self.cooker.post_serve()
263
264 # Flush logs before we release the lock
265 sys.stdout.flush()
266 sys.stderr.flush()
267
268 # Finally release the lockfile but warn about other processes holding it open
269 lock = self.bitbake_lock
270 lockfile = self.bitbake_lock_name
271
272 def get_lock_contents(lockfile):
273 try:
274 with open(lockfile, "r") as f:
275 return f.readlines()
276 except FileNotFoundError:
277 return None
278
279 lockcontents = get_lock_contents(lockfile)
280 serverlog("Original lockfile contents: " + str(lockcontents))
281
282 lock.close()
283 lock = None
284
285 while not lock:
286 i = 0
287 lock = None
288 while not lock and i < 30:
289 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
290 if not lock:
291 newlockcontents = get_lock_contents(lockfile)
292 if newlockcontents != lockcontents:
293 # A new server was started, the lockfile contents changed, we can exit
294 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
295 return
296 time.sleep(0.1)
297 i += 1
298 if lock:
299 # We hold the lock so we can remove the file (hide stale pid data)
300 # via unlockfile.
301 bb.utils.unlockfile(lock)
302 serverlog("Exiting as we could obtain the lock")
303 return
304
305 if not lock:
306 # Some systems may not have lsof available
307 procs = None
308 try:
309 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
310 except subprocess.CalledProcessError:
311 # File was deleted?
312 continue
313 except OSError as e:
314 if e.errno != errno.ENOENT:
315 raise
316 if procs is None:
317 # Fall back to fuser if lsof is unavailable
318 try:
319 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
320 except subprocess.CalledProcessError:
321 # File was deleted?
322 continue
323 except OSError as e:
324 if e.errno != errno.ENOENT:
325 raise
326
327 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
328 if procs:
329 msg += ":\n%s" % str(procs.decode("utf-8"))
330 serverlog(msg)
331
332 def idle_commands(self, delay, fds=None):
333 nextsleep = delay
334 if not fds:
335 fds = []
336
337 for function, data in list(self._idlefuns.items()):
338 try:
339 retval = function(self, data, False)
340 if retval is False:
341 del self._idlefuns[function]
342 nextsleep = None
343 elif retval is True:
344 nextsleep = None
345 elif isinstance(retval, float) and nextsleep:
346 if (retval < nextsleep):
347 nextsleep = retval
348 elif nextsleep is None:
349 continue
350 else:
351 fds = fds + retval
352 except SystemExit:
353 raise
354 except Exception as exc:
355 if not isinstance(exc, bb.BBHandledException):
356 logger.exception('Running idle function')
357 del self._idlefuns[function]
358 self.quit = True
359
360 # Create new heartbeat event?
361 now = time.time()
362 if now >= self.next_heartbeat:
363 # We might have missed heartbeats. Just trigger once in
364 # that case and continue after the usual delay.
365 self.next_heartbeat += self.heartbeat_seconds
366 if self.next_heartbeat <= now:
367 self.next_heartbeat = now + self.heartbeat_seconds
368 if hasattr(self.cooker, "data"):
369 heartbeat = bb.event.HeartbeatEvent(now)
370 bb.event.fire(heartbeat, self.cooker.data)
371 if nextsleep and now + nextsleep > self.next_heartbeat:
372 # Shorten timeout so that we we wake up in time for
373 # the heartbeat.
374 nextsleep = self.next_heartbeat - now
375
376 if nextsleep is not None:
377 if self.xmlrpc:
378 nextsleep = self.xmlrpc.get_timeout(nextsleep)
379 try:
380 return select.select(fds,[],[],nextsleep)[0]
381 except InterruptedError:
382 # Ignore EINTR
383 return []
384 else:
385 return select.select(fds,[],[],0)[0]
386
387
388class ServerCommunicator():
389 def __init__(self, connection, recv):
390 self.connection = connection
391 self.recv = recv
392
393 def runCommand(self, command):
394 self.connection.send(command)
395 if not self.recv.poll(30):
396 logger.info("No reply from server in 30s")
397 if not self.recv.poll(30):
398 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
399 ret, exc = self.recv.get()
400 # Should probably turn all exceptions in exc back into exceptions?
401 # For now, at least handle BBHandledException
402 if exc and ("BBHandledException" in exc or "SystemExit" in exc):
403 raise bb.BBHandledException()
404 return ret, exc
405
406 def updateFeatureSet(self, featureset):
407 _, error = self.runCommand(["setFeatures", featureset])
408 if error:
409 logger.error("Unable to set the cooker to the correct featureset: %s" % error)
410 raise BaseException(error)
411
412 def getEventHandle(self):
413 handle, error = self.runCommand(["getUIHandlerNum"])
414 if error:
415 logger.error("Unable to get UI Handler Number: %s" % error)
416 raise BaseException(error)
417
418 return handle
419
420 def terminateServer(self):
421 self.connection.send(['terminateServer'])
422 return
423
424class BitBakeProcessServerConnection(object):
425 def __init__(self, ui_channel, recv, eq, sock):
426 self.connection = ServerCommunicator(ui_channel, recv)
427 self.events = eq
428 # Save sock so it doesn't get gc'd for the life of our connection
429 self.socket_connection = sock
430
431 def terminate(self):
432 self.socket_connection.close()
433 self.connection.connection.close()
434 self.connection.recv.close()
435 return
436
437start_log_format = '--- Starting bitbake server pid %s at %s ---'
438start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
439
440class BitBakeServer(object):
441
442 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface):
443
444 self.server_timeout = server_timeout
445 self.xmlrpcinterface = xmlrpcinterface
446 self.featureset = featureset
447 self.sockname = sockname
448 self.bitbake_lock = lock
449 self.readypipe, self.readypipein = os.pipe()
450
451 # Place the log in the builddirectory alongside the lock file
452 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
453 self.logfile = logfile
454
455 startdatetime = datetime.datetime.now()
456 bb.daemonize.createDaemon(self._startServer, logfile)
457 self.bitbake_lock.close()
458 os.close(self.readypipein)
459
460 ready = ConnectionReader(self.readypipe)
461 r = ready.poll(5)
462 if not r:
463 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
464 r = ready.poll(90)
465 if r:
466 try:
467 r = ready.get()
468 except EOFError:
469 # Trap the child exitting/closing the pipe and error out
470 r = None
471 if not r or r[0] != "r":
472 ready.close()
473 bb.error("Unable to start bitbake server (%s)" % str(r))
474 if os.path.exists(logfile):
475 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
476 started = False
477 lines = []
478 lastlines = []
479 with open(logfile, "r") as f:
480 for line in f:
481 if started:
482 lines.append(line)
483 else:
484 lastlines.append(line)
485 res = logstart_re.search(line.rstrip())
486 if res:
487 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
488 if ldatetime >= startdatetime:
489 started = True
490 lines.append(line)
491 if len(lastlines) > 60:
492 lastlines = lastlines[-60:]
493 if lines:
494 if len(lines) > 60:
495 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
496 else:
497 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
498 elif lastlines:
499 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
500 else:
501 bb.error("%s doesn't exist" % logfile)
502
503 raise SystemExit(1)
504
505 ready.close()
506
507 def _startServer(self):
508 os.close(self.readypipe)
509 os.set_inheritable(self.bitbake_lock.fileno(), True)
510 os.set_inheritable(self.readypipein, True)
511 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
512 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
513
514def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface):
515
516 import bb.cookerdata
517 import bb.cooker
518
519 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
520
521 try:
522 bitbake_lock = os.fdopen(lockfd, "w")
523
524 # Create server control socket
525 if os.path.exists(sockname):
526 os.unlink(sockname)
527
528 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
529 # AF_UNIX has path length issues so chdir here to workaround
530 cwd = os.getcwd()
531 try:
532 os.chdir(os.path.dirname(sockname))
533 sock.bind(os.path.basename(sockname))
534 finally:
535 os.chdir(cwd)
536 sock.listen(1)
537
538 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
539 writer = ConnectionWriter(readypipeinfd)
540 try:
541 featureset = []
542 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function)
543 except bb.BBHandledException:
544 return None
545 writer.send("r")
546 writer.close()
547 server.cooker = cooker
548 serverlog("Started bitbake server pid %d" % os.getpid())
549
550 server.run()
551 finally:
552 # Flush any ,essages/errors to the logfile before exit
553 sys.stdout.flush()
554 sys.stderr.flush()
555
556def connectProcessServer(sockname, featureset):
557 # Connect to socket
558 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
559 # AF_UNIX has path length issues so chdir here to workaround
560 cwd = os.getcwd()
561
562 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
563 eq = command_chan_recv = command_chan = None
564
565 sock.settimeout(10)
566
567 try:
568 try:
569 os.chdir(os.path.dirname(sockname))
570 finished = False
571 while not finished:
572 try:
573 sock.connect(os.path.basename(sockname))
574 finished = True
575 except IOError as e:
576 if e.errno == errno.EWOULDBLOCK:
577 pass
578 raise
579 finally:
580 os.chdir(cwd)
581
582 # Send an fd for the remote to write events to
583 readfd, writefd = os.pipe()
584 eq = BBUIEventQueue(readfd)
585 # Send an fd for the remote to recieve commands from
586 readfd1, writefd1 = os.pipe()
587 command_chan = ConnectionWriter(writefd1)
588 # Send an fd for the remote to write commands results to
589 readfd2, writefd2 = os.pipe()
590 command_chan_recv = ConnectionReader(readfd2)
591
592 sendfds(sock, [writefd, readfd1, writefd2])
593
594 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
595
596 # Close the ends of the pipes we won't use
597 for i in [writefd, readfd1, writefd2]:
598 os.close(i)
599
600 server_connection.connection.updateFeatureSet(featureset)
601
602 except (Exception, SystemExit) as e:
603 if command_chan_recv:
604 command_chan_recv.close()
605 if command_chan:
606 command_chan.close()
607 for i in [writefd, readfd1, writefd2]:
608 try:
609 if i:
610 os.close(i)
611 except OSError:
612 pass
613 sock.close()
614 raise
615
616 return server_connection
617
618def sendfds(sock, fds):
619 '''Send an array of fds over an AF_UNIX socket.'''
620 fds = array.array('i', fds)
621 msg = bytes([len(fds) % 256])
622 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
623
624def recvfds(sock, size):
625 '''Receive an array of fds over an AF_UNIX socket.'''
626 a = array.array('i')
627 bytes_size = a.itemsize * size
628 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
629 if not msg and not ancdata:
630 raise EOFError
631 try:
632 if len(ancdata) != 1:
633 raise RuntimeError('received %d items of ancdata' %
634 len(ancdata))
635 cmsg_level, cmsg_type, cmsg_data = ancdata[0]
636 if (cmsg_level == socket.SOL_SOCKET and
637 cmsg_type == socket.SCM_RIGHTS):
638 if len(cmsg_data) % a.itemsize != 0:
639 raise ValueError
640 a.frombytes(cmsg_data)
641 assert len(a) % 256 == msg[0]
642 return list(a)
643 except (ValueError, IndexError):
644 pass
645 raise RuntimeError('Invalid data received')
646
647class BBUIEventQueue:
648 def __init__(self, readfd):
649
650 self.eventQueue = []
651 self.eventQueueLock = threading.Lock()
652 self.eventQueueNotify = threading.Event()
653
654 self.reader = ConnectionReader(readfd)
655
656 self.t = threading.Thread()
657 self.t.setDaemon(True)
658 self.t.run = self.startCallbackHandler
659 self.t.start()
660
661 def getEvent(self):
662 self.eventQueueLock.acquire()
663
664 if len(self.eventQueue) == 0:
665 self.eventQueueLock.release()
666 return None
667
668 item = self.eventQueue.pop(0)
669
670 if len(self.eventQueue) == 0:
671 self.eventQueueNotify.clear()
672
673 self.eventQueueLock.release()
674 return item
675
676 def waitEvent(self, delay):
677 self.eventQueueNotify.wait(delay)
678 return self.getEvent()
679
680 def queue_event(self, event):
681 self.eventQueueLock.acquire()
682 self.eventQueue.append(event)
683 self.eventQueueNotify.set()
684 self.eventQueueLock.release()
685
686 def send_event(self, event):
687 self.queue_event(pickle.loads(event))
688
689 def startCallbackHandler(self):
690 bb.utils.set_process_name("UIEventQueue")
691 while True:
692 try:
693 self.reader.wait()
694 event = self.reader.get()
695 self.queue_event(event)
696 except EOFError:
697 # Easiest way to exit is to close the file descriptor to cause an exit
698 break
699 self.reader.close()
700
701class ConnectionReader(object):
702
703 def __init__(self, fd):
704 self.reader = multiprocessing.connection.Connection(fd, writable=False)
705 self.rlock = multiprocessing.Lock()
706
707 def wait(self, timeout=None):
708 return multiprocessing.connection.wait([self.reader], timeout)
709
710 def poll(self, timeout=None):
711 return self.reader.poll(timeout)
712
713 def get(self):
714 with self.rlock:
715 res = self.reader.recv_bytes()
716 return multiprocessing.reduction.ForkingPickler.loads(res)
717
718 def fileno(self):
719 return self.reader.fileno()
720
721 def close(self):
722 return self.reader.close()
723
724
725class ConnectionWriter(object):
726
727 def __init__(self, fd):
728 self.writer = multiprocessing.connection.Connection(fd, readable=False)
729 self.wlock = multiprocessing.Lock()
730 # Why bb.event needs this I have no idea
731 self.event = self
732
733 def send(self, obj):
734 obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
735 with self.wlock:
736 self.writer.send_bytes(obj)
737
738 def fileno(self):
739 return self.writer.fileno()
740
741 def close(self):
742 return self.writer.close()