diff options
author | Paul Barker <pbarker@konsulko.com> | 2021-04-29 15:11:12 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2021-05-06 11:08:07 +0100 |
commit | 81b55a050dbd8d93f16e0cf478a2d49d6404ff91 (patch) | |
tree | 4b5e75b9545c434a72ead209c2acbae3da5ae55c /bitbake | |
parent | d66a1d83f53811da8ecbee91e9d0c3f189b661ea (diff) | |
download | poky-81b55a050dbd8d93f16e0cf478a2d49d6404ff91.tar.gz |
bitbake: prserv: Handle requests in main thread
The prserver process is cleanly separated from the main bitbake process
so requests can be handled in the main thread. This removes the need for
a request queue and a separate request handling thread.
(Bitbake rev: 6b09415bed6b5e7c12aaf39b677d9ef72844e233)
Signed-off-by: Paul Barker <pbarker@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/prserv/serv.py | 159 |
1 files changed, 36 insertions, 123 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index 24a8c4bacf..5e322bf83d 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
@@ -5,8 +5,6 @@ | |||
5 | import os,sys,logging | 5 | import os,sys,logging |
6 | import signal, time | 6 | import signal, time |
7 | from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler | 7 | from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler |
8 | import threading | ||
9 | import queue | ||
10 | import socket | 8 | import socket |
11 | import io | 9 | import io |
12 | import sqlite3 | 10 | import sqlite3 |
@@ -14,7 +12,6 @@ import bb.server.xmlrpcclient | |||
14 | import prserv | 12 | import prserv |
15 | import prserv.db | 13 | import prserv.db |
16 | import errno | 14 | import errno |
17 | import select | ||
18 | import multiprocessing | 15 | import multiprocessing |
19 | 16 | ||
20 | logger = logging.getLogger("BitBake.PRserv") | 17 | logger = logging.getLogger("BitBake.PRserv") |
@@ -48,54 +45,17 @@ class PRServer(SimpleXMLRPCServer): | |||
48 | 45 | ||
49 | self.dbfile=dbfile | 46 | self.dbfile=dbfile |
50 | self.logfile=logfile | 47 | self.logfile=logfile |
51 | self.working_thread=None | ||
52 | self.host, self.port = self.socket.getsockname() | 48 | self.host, self.port = self.socket.getsockname() |
53 | self.pidfile=PIDPREFIX % (self.host, self.port) | ||
54 | 49 | ||
55 | self.register_function(self.getPR, "getPR") | 50 | self.register_function(self.getPR, "getPR") |
56 | self.register_function(self.quit, "quit") | ||
57 | self.register_function(self.ping, "ping") | 51 | self.register_function(self.ping, "ping") |
58 | self.register_function(self.export, "export") | 52 | self.register_function(self.export, "export") |
59 | self.register_function(self.importone, "importone") | 53 | self.register_function(self.importone, "importone") |
60 | self.register_introspection_functions() | 54 | self.register_introspection_functions() |
61 | 55 | ||
62 | self.quitpipein, self.quitpipeout = os.pipe() | 56 | self.iter_count = 0 |
63 | |||
64 | self.requestqueue = queue.Queue() | ||
65 | self.handlerthread = threading.Thread(target = self.process_request_thread) | ||
66 | self.handlerthread.daemon = False | ||
67 | |||
68 | def process_request_thread(self): | ||
69 | """Same as in BaseServer but as a thread. | ||
70 | |||
71 | In addition, exception handling is done here. | ||
72 | |||
73 | """ | ||
74 | iter_count = 1 | ||
75 | # 60 iterations between syncs or sync if dirty every ~30 seconds | 57 | # 60 iterations between syncs or sync if dirty every ~30 seconds |
76 | iterations_between_sync = 60 | 58 | self.iterations_between_sync = 60 |
77 | |||
78 | bb.utils.set_process_name("PRServ Handler") | ||
79 | |||
80 | while not self.quitflag: | ||
81 | try: | ||
82 | (request, client_address) = self.requestqueue.get(True, 30) | ||
83 | except queue.Empty: | ||
84 | self.table.sync_if_dirty() | ||
85 | continue | ||
86 | if request is None: | ||
87 | continue | ||
88 | try: | ||
89 | self.finish_request(request, client_address) | ||
90 | self.shutdown_request(request) | ||
91 | iter_count = (iter_count + 1) % iterations_between_sync | ||
92 | if iter_count == 0: | ||
93 | self.table.sync_if_dirty() | ||
94 | except: | ||
95 | self.handle_error(request, client_address) | ||
96 | self.shutdown_request(request) | ||
97 | self.table.sync() | ||
98 | self.table.sync_if_dirty() | ||
99 | 59 | ||
100 | def sigint_handler(self, signum, stack): | 60 | def sigint_handler(self, signum, stack): |
101 | if self.table: | 61 | if self.table: |
@@ -104,11 +64,30 @@ class PRServer(SimpleXMLRPCServer): | |||
104 | def sigterm_handler(self, signum, stack): | 64 | def sigterm_handler(self, signum, stack): |
105 | if self.table: | 65 | if self.table: |
106 | self.table.sync() | 66 | self.table.sync() |
107 | self.quit() | 67 | raise(SystemExit) |
108 | self.requestqueue.put((None, None)) | ||
109 | 68 | ||
110 | def process_request(self, request, client_address): | 69 | def process_request(self, request, client_address): |
111 | self.requestqueue.put((request, client_address)) | 70 | if request is None: |
71 | return | ||
72 | try: | ||
73 | self.finish_request(request, client_address) | ||
74 | self.shutdown_request(request) | ||
75 | self.iter_count = (self.iter_count + 1) % self.iterations_between_sync | ||
76 | if self.iter_count == 0: | ||
77 | self.table.sync_if_dirty() | ||
78 | except: | ||
79 | self.handle_error(request, client_address) | ||
80 | self.shutdown_request(request) | ||
81 | self.table.sync() | ||
82 | self.table.sync_if_dirty() | ||
83 | |||
84 | def serve_forever(self, poll_interval=0.5): | ||
85 | signal.signal(signal.SIGINT, self.sigint_handler) | ||
86 | signal.signal(signal.SIGTERM, self.sigterm_handler) | ||
87 | |||
88 | self.db = prserv.db.PRData(self.dbfile) | ||
89 | self.table = self.db["PRMAIN"] | ||
90 | return super().serve_forever(poll_interval) | ||
112 | 91 | ||
113 | def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): | 92 | def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): |
114 | try: | 93 | try: |
@@ -121,7 +100,7 @@ class PRServer(SimpleXMLRPCServer): | |||
121 | return self.table.importone(version, pkgarch, checksum, value) | 100 | return self.table.importone(version, pkgarch, checksum, value) |
122 | 101 | ||
123 | def ping(self): | 102 | def ping(self): |
124 | return not self.quitflag | 103 | return True |
125 | 104 | ||
126 | def getinfo(self): | 105 | def getinfo(self): |
127 | return (self.host, self.port) | 106 | return (self.host, self.port) |
@@ -136,45 +115,6 @@ class PRServer(SimpleXMLRPCServer): | |||
136 | logger.error(str(exc)) | 115 | logger.error(str(exc)) |
137 | return None | 116 | return None |
138 | 117 | ||
139 | def quit(self): | ||
140 | self.quitflag=True | ||
141 | os.write(self.quitpipeout, b"q") | ||
142 | os.close(self.quitpipeout) | ||
143 | return | ||
144 | |||
145 | def work_forever(self,): | ||
146 | self.quitflag = False | ||
147 | # This timeout applies to the poll in TCPServer, we need the select | ||
148 | # below to wake on our quit pipe closing. We only ever call into handle_request | ||
149 | # if there is data there. | ||
150 | self.timeout = 0.01 | ||
151 | |||
152 | signal.signal(signal.SIGINT, self.sigint_handler) | ||
153 | signal.signal(signal.SIGTERM, self.sigterm_handler) | ||
154 | |||
155 | bb.utils.set_process_name("PRServ") | ||
156 | |||
157 | # DB connection must be created after all forks | ||
158 | self.db = prserv.db.PRData(self.dbfile) | ||
159 | self.table = self.db["PRMAIN"] | ||
160 | |||
161 | logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % | ||
162 | (self.dbfile, self.host, self.port, str(os.getpid()))) | ||
163 | |||
164 | self.handlerthread.start() | ||
165 | while not self.quitflag: | ||
166 | ready = select.select([self.fileno(), self.quitpipein], [], [], 30) | ||
167 | if self.quitflag: | ||
168 | break | ||
169 | if self.fileno() in ready[0]: | ||
170 | self.handle_request() | ||
171 | self.handlerthread.join() | ||
172 | self.db.disconnect() | ||
173 | logger.info("PRServer: stopping...") | ||
174 | self.server_close() | ||
175 | os.close(self.quitpipein) | ||
176 | return | ||
177 | |||
178 | class PRServSingleton(object): | 118 | class PRServSingleton(object): |
179 | def __init__(self, dbfile, logfile, interface): | 119 | def __init__(self, dbfile, logfile, interface): |
180 | self.dbfile = dbfile | 120 | self.dbfile = dbfile |
@@ -185,7 +125,7 @@ class PRServSingleton(object): | |||
185 | 125 | ||
186 | def start(self): | 126 | def start(self): |
187 | self.prserv = PRServer(self.dbfile, self.logfile, self.interface) | 127 | self.prserv = PRServer(self.dbfile, self.logfile, self.interface) |
188 | self.process = multiprocessing.Process(target=self.prserv.work_forever) | 128 | self.process = multiprocessing.Process(target=self.prserv.serve_forever) |
189 | self.process.start() | 129 | self.process.start() |
190 | 130 | ||
191 | self.host, self.port = self.prserv.getinfo() | 131 | self.host, self.port = self.prserv.getinfo() |
@@ -201,13 +141,6 @@ class PRServerConnection(object): | |||
201 | self.port = port | 141 | self.port = port |
202 | self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) | 142 | self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) |
203 | 143 | ||
204 | def terminate(self): | ||
205 | try: | ||
206 | logger.info("Terminating PRServer...") | ||
207 | self.connection.quit() | ||
208 | except Exception as exc: | ||
209 | sys.stderr.write("%s\n" % str(exc)) | ||
210 | |||
211 | def getPR(self, version, pkgarch, checksum): | 144 | def getPR(self, version, pkgarch, checksum): |
212 | return self.connection.getPR(version, pkgarch, checksum) | 145 | return self.connection.getPR(version, pkgarch, checksum) |
213 | 146 | ||
@@ -308,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile): | |||
308 | return 1 | 241 | return 1 |
309 | 242 | ||
310 | server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) | 243 | server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) |
311 | run_as_daemon(server.work_forever, pidfile, os.path.abspath(logfile)) | 244 | run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) |
312 | 245 | ||
313 | # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with | 246 | # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with |
314 | # the one the server actually is listening, so at least warn the user about it | 247 | # the one the server actually is listening, so at least warn the user about it |
@@ -345,25 +278,13 @@ def stop_daemon(host, port): | |||
345 | return 1 | 278 | return 1 |
346 | 279 | ||
347 | try: | 280 | try: |
348 | PRServerConnection(ip, port).terminate() | 281 | if is_running(pid): |
349 | except: | 282 | print("Sending SIGTERM to pr-server.") |
350 | logger.critical("Stop PRService %s:%d failed" % (host,port)) | 283 | os.kill(pid, signal.SIGTERM) |
284 | time.sleep(0.1) | ||
351 | 285 | ||
352 | try: | 286 | if os.path.exists(pidfile): |
353 | if pid: | 287 | os.remove(pidfile) |
354 | wait_timeout = 0 | ||
355 | print("Waiting for pr-server to exit.") | ||
356 | while is_running(pid) and wait_timeout < 50: | ||
357 | time.sleep(0.1) | ||
358 | wait_timeout += 1 | ||
359 | |||
360 | if is_running(pid): | ||
361 | print("Sending SIGTERM to pr-server.") | ||
362 | os.kill(pid,signal.SIGTERM) | ||
363 | time.sleep(0.1) | ||
364 | |||
365 | if os.path.exists(pidfile): | ||
366 | os.remove(pidfile) | ||
367 | 288 | ||
368 | except OSError as e: | 289 | except OSError as e: |
369 | err = str(e) | 290 | err = str(e) |
@@ -439,17 +360,9 @@ def auto_start(d): | |||
439 | 360 | ||
440 | def auto_shutdown(): | 361 | def auto_shutdown(): |
441 | global singleton | 362 | global singleton |
442 | if singleton: | 363 | if singleton and singleton.process: |
443 | host, port = singleton.getinfo() | 364 | singleton.process.terminate() |
444 | try: | 365 | singleton.process.join() |
445 | PRServerConnection(host, port).terminate() | ||
446 | except: | ||
447 | logger.critical("Stop PRService %s:%d failed" % (host,port)) | ||
448 | |||
449 | try: | ||
450 | os.waitpid(singleton.prserv.pid, 0) | ||
451 | except ChildProcessError: | ||
452 | pass | ||
453 | singleton = None | 366 | singleton = None |
454 | 367 | ||
455 | def ping(host, port): | 368 | def ping(host, port): |