summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorPaul Barker <pbarker@konsulko.com>2021-04-29 15:11:12 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2021-05-06 11:08:07 +0100
commit81b55a050dbd8d93f16e0cf478a2d49d6404ff91 (patch)
tree4b5e75b9545c434a72ead209c2acbae3da5ae55c /bitbake
parentd66a1d83f53811da8ecbee91e9d0c3f189b661ea (diff)
downloadpoky-81b55a050dbd8d93f16e0cf478a2d49d6404ff91.tar.gz
bitbake: prserv: Handle requests in main thread
The prserver process is cleanly separated from the main bitbake process so requests can be handled in the main thread. This removes the need for a request queue and a separate request handling thread. (Bitbake rev: 6b09415bed6b5e7c12aaf39b677d9ef72844e233) Signed-off-by: Paul Barker <pbarker@konsulko.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/prserv/serv.py159
1 files changed, 36 insertions, 123 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 24a8c4bacf..5e322bf83d 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -5,8 +5,6 @@
5import os,sys,logging 5import os,sys,logging
6import signal, time 6import signal, time
7from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler 7from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
8import threading
9import queue
10import socket 8import socket
11import io 9import io
12import sqlite3 10import sqlite3
@@ -14,7 +12,6 @@ import bb.server.xmlrpcclient
14import prserv 12import prserv
15import prserv.db 13import prserv.db
16import errno 14import errno
17import select
18import multiprocessing 15import multiprocessing
19 16
20logger = logging.getLogger("BitBake.PRserv") 17logger = logging.getLogger("BitBake.PRserv")
@@ -48,54 +45,17 @@ class PRServer(SimpleXMLRPCServer):
48 45
49 self.dbfile=dbfile 46 self.dbfile=dbfile
50 self.logfile=logfile 47 self.logfile=logfile
51 self.working_thread=None
52 self.host, self.port = self.socket.getsockname() 48 self.host, self.port = self.socket.getsockname()
53 self.pidfile=PIDPREFIX % (self.host, self.port)
54 49
55 self.register_function(self.getPR, "getPR") 50 self.register_function(self.getPR, "getPR")
56 self.register_function(self.quit, "quit")
57 self.register_function(self.ping, "ping") 51 self.register_function(self.ping, "ping")
58 self.register_function(self.export, "export") 52 self.register_function(self.export, "export")
59 self.register_function(self.importone, "importone") 53 self.register_function(self.importone, "importone")
60 self.register_introspection_functions() 54 self.register_introspection_functions()
61 55
62 self.quitpipein, self.quitpipeout = os.pipe() 56 self.iter_count = 0
63
64 self.requestqueue = queue.Queue()
65 self.handlerthread = threading.Thread(target = self.process_request_thread)
66 self.handlerthread.daemon = False
67
68 def process_request_thread(self):
69 """Same as in BaseServer but as a thread.
70
71 In addition, exception handling is done here.
72
73 """
74 iter_count = 1
75 # 60 iterations between syncs or sync if dirty every ~30 seconds 57 # 60 iterations between syncs or sync if dirty every ~30 seconds
76 iterations_between_sync = 60 58 self.iterations_between_sync = 60
77
78 bb.utils.set_process_name("PRServ Handler")
79
80 while not self.quitflag:
81 try:
82 (request, client_address) = self.requestqueue.get(True, 30)
83 except queue.Empty:
84 self.table.sync_if_dirty()
85 continue
86 if request is None:
87 continue
88 try:
89 self.finish_request(request, client_address)
90 self.shutdown_request(request)
91 iter_count = (iter_count + 1) % iterations_between_sync
92 if iter_count == 0:
93 self.table.sync_if_dirty()
94 except:
95 self.handle_error(request, client_address)
96 self.shutdown_request(request)
97 self.table.sync()
98 self.table.sync_if_dirty()
99 59
100 def sigint_handler(self, signum, stack): 60 def sigint_handler(self, signum, stack):
101 if self.table: 61 if self.table:
@@ -104,11 +64,30 @@ class PRServer(SimpleXMLRPCServer):
104 def sigterm_handler(self, signum, stack): 64 def sigterm_handler(self, signum, stack):
105 if self.table: 65 if self.table:
106 self.table.sync() 66 self.table.sync()
107 self.quit() 67 raise(SystemExit)
108 self.requestqueue.put((None, None))
109 68
110 def process_request(self, request, client_address): 69 def process_request(self, request, client_address):
111 self.requestqueue.put((request, client_address)) 70 if request is None:
71 return
72 try:
73 self.finish_request(request, client_address)
74 self.shutdown_request(request)
75 self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
76 if self.iter_count == 0:
77 self.table.sync_if_dirty()
78 except:
79 self.handle_error(request, client_address)
80 self.shutdown_request(request)
81 self.table.sync()
82 self.table.sync_if_dirty()
83
84 def serve_forever(self, poll_interval=0.5):
85 signal.signal(signal.SIGINT, self.sigint_handler)
86 signal.signal(signal.SIGTERM, self.sigterm_handler)
87
88 self.db = prserv.db.PRData(self.dbfile)
89 self.table = self.db["PRMAIN"]
90 return super().serve_forever(poll_interval)
112 91
113 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): 92 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
114 try: 93 try:
@@ -121,7 +100,7 @@ class PRServer(SimpleXMLRPCServer):
121 return self.table.importone(version, pkgarch, checksum, value) 100 return self.table.importone(version, pkgarch, checksum, value)
122 101
123 def ping(self): 102 def ping(self):
124 return not self.quitflag 103 return True
125 104
126 def getinfo(self): 105 def getinfo(self):
127 return (self.host, self.port) 106 return (self.host, self.port)
@@ -136,45 +115,6 @@ class PRServer(SimpleXMLRPCServer):
136 logger.error(str(exc)) 115 logger.error(str(exc))
137 return None 116 return None
138 117
139 def quit(self):
140 self.quitflag=True
141 os.write(self.quitpipeout, b"q")
142 os.close(self.quitpipeout)
143 return
144
145 def work_forever(self,):
146 self.quitflag = False
147 # This timeout applies to the poll in TCPServer, we need the select
148 # below to wake on our quit pipe closing. We only ever call into handle_request
149 # if there is data there.
150 self.timeout = 0.01
151
152 signal.signal(signal.SIGINT, self.sigint_handler)
153 signal.signal(signal.SIGTERM, self.sigterm_handler)
154
155 bb.utils.set_process_name("PRServ")
156
157 # DB connection must be created after all forks
158 self.db = prserv.db.PRData(self.dbfile)
159 self.table = self.db["PRMAIN"]
160
161 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
162 (self.dbfile, self.host, self.port, str(os.getpid())))
163
164 self.handlerthread.start()
165 while not self.quitflag:
166 ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
167 if self.quitflag:
168 break
169 if self.fileno() in ready[0]:
170 self.handle_request()
171 self.handlerthread.join()
172 self.db.disconnect()
173 logger.info("PRServer: stopping...")
174 self.server_close()
175 os.close(self.quitpipein)
176 return
177
178class PRServSingleton(object): 118class PRServSingleton(object):
179 def __init__(self, dbfile, logfile, interface): 119 def __init__(self, dbfile, logfile, interface):
180 self.dbfile = dbfile 120 self.dbfile = dbfile
@@ -185,7 +125,7 @@ class PRServSingleton(object):
185 125
186 def start(self): 126 def start(self):
187 self.prserv = PRServer(self.dbfile, self.logfile, self.interface) 127 self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
188 self.process = multiprocessing.Process(target=self.prserv.work_forever) 128 self.process = multiprocessing.Process(target=self.prserv.serve_forever)
189 self.process.start() 129 self.process.start()
190 130
191 self.host, self.port = self.prserv.getinfo() 131 self.host, self.port = self.prserv.getinfo()
@@ -201,13 +141,6 @@ class PRServerConnection(object):
201 self.port = port 141 self.port = port
202 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) 142 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
203 143
204 def terminate(self):
205 try:
206 logger.info("Terminating PRServer...")
207 self.connection.quit()
208 except Exception as exc:
209 sys.stderr.write("%s\n" % str(exc))
210
211 def getPR(self, version, pkgarch, checksum): 144 def getPR(self, version, pkgarch, checksum):
212 return self.connection.getPR(version, pkgarch, checksum) 145 return self.connection.getPR(version, pkgarch, checksum)
213 146
@@ -308,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile):
308 return 1 241 return 1
309 242
310 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) 243 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
311 run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile)) 244 run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
312 245
313 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with 246 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
314 # the one the server actually is listening, so at least warn the user about it 247 # the one the server actually is listening, so at least warn the user about it
@@ -345,25 +278,13 @@ def stop_daemon(host, port):
345 return 1 278 return 1
346 279
347 try: 280 try:
348 PRServerConnection(ip, port).terminate() 281 if is_running(pid):
349 except: 282 print("Sending SIGTERM to pr-server.")
350 logger.critical("Stop PRService %s:%d failed" % (host,port)) 283 os.kill(pid, signal.SIGTERM)
284 time.sleep(0.1)
351 285
352 try: 286 if os.path.exists(pidfile):
353 if pid: 287 os.remove(pidfile)
354 wait_timeout = 0
355 print("Waiting for pr-server to exit.")
356 while is_running(pid) and wait_timeout < 50:
357 time.sleep(0.1)
358 wait_timeout += 1
359
360 if is_running(pid):
361 print("Sending SIGTERM to pr-server.")
362 os.kill(pid,signal.SIGTERM)
363 time.sleep(0.1)
364
365 if os.path.exists(pidfile):
366 os.remove(pidfile)
367 288
368 except OSError as e: 289 except OSError as e:
369 err = str(e) 290 err = str(e)
@@ -439,17 +360,9 @@ def auto_start(d):
439 360
440def auto_shutdown(): 361def auto_shutdown():
441 global singleton 362 global singleton
442 if singleton: 363 if singleton and singleton.process:
443 host, port = singleton.getinfo() 364 singleton.process.terminate()
444 try: 365 singleton.process.join()
445 PRServerConnection(host, port).terminate()
446 except:
447 logger.critical("Stop PRService %s:%d failed" % (host,port))
448
449 try:
450 os.waitpid(singleton.prserv.pid, 0)
451 except ChildProcessError:
452 pass
453 singleton = None 366 singleton = None
454 367
455def ping(host, port): 368def ping(host, port):