diff options
author | Paul Barker <pbarker@konsulko.com> | 2021-08-19 12:46:43 -0400 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2021-08-23 08:30:54 +0100 |
commit | fb3b05fe8da817967c9f90d4c4c0c1fee87c9f01 (patch) | |
tree | d6fe2fd8a3c3244932d2dbc393209ed638028855 /bitbake | |
parent | 4df610473f21da79164ed01927103d13240d4c2a (diff) | |
download | poky-fb3b05fe8da817967c9f90d4c4c0c1fee87c9f01.tar.gz |
bitbake: prserv: Replace XML RPC with modern asyncrpc implementation
Update the prserv client and server classes to use the modern json and
asyncio based RPC system implemented by the asyncrpc module.
(Bitbake rev: 6a2b23e27bb61185b8afb382e20ce79f996d9183)
Signed-off-by: Paul Barker <pbarker@konsulko.com>
[updated for asyncrpc changes, client split to separate file]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/prserv/client.py | 41 | ||||
-rw-r--r-- | bitbake/lib/prserv/serv.py | 234 |
2 files changed, 147 insertions, 128 deletions
diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py new file mode 100644 index 0000000000..285dce72f6 --- /dev/null +++ b/bitbake/lib/prserv/client.py | |||
@@ -0,0 +1,41 @@ | |||
1 | # | ||
2 | # SPDX-License-Identifier: GPL-2.0-only | ||
3 | # | ||
4 | |||
5 | import logging | ||
6 | import bb.asyncrpc | ||
7 | |||
8 | logger = logging.getLogger("BitBake.PRserv") | ||
9 | |||
10 | class PRAsyncClient(bb.asyncrpc.AsyncClient): | ||
11 | def __init__(self): | ||
12 | super().__init__('PRSERVICE', '1.0', logger) | ||
13 | |||
14 | async def getPR(self, version, pkgarch, checksum): | ||
15 | response = await self.send_message( | ||
16 | {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} | ||
17 | ) | ||
18 | if response: | ||
19 | return response['value'] | ||
20 | |||
21 | async def importone(self, version, pkgarch, checksum, value): | ||
22 | response = await self.send_message( | ||
23 | {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} | ||
24 | ) | ||
25 | if response: | ||
26 | return response['value'] | ||
27 | |||
28 | async def export(self, version, pkgarch, checksum, colinfo): | ||
29 | response = await self.send_message( | ||
30 | {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} | ||
31 | ) | ||
32 | if response: | ||
33 | return (response['metainfo'], response['datainfo']) | ||
34 | |||
35 | class PRClient(bb.asyncrpc.Client): | ||
36 | def __init__(self): | ||
37 | super().__init__() | ||
38 | self._add_methods('getPR', 'importone', 'export') | ||
39 | |||
40 | def _get_async_client(self): | ||
41 | return PRAsyncClient() | ||
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index 5e322bf83d..1fa4e1766c 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
@@ -4,157 +4,125 @@ | |||
4 | 4 | ||
5 | import os,sys,logging | 5 | import os,sys,logging |
6 | import signal, time | 6 | import signal, time |
7 | from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler | ||
8 | import socket | 7 | import socket |
9 | import io | 8 | import io |
10 | import sqlite3 | 9 | import sqlite3 |
11 | import bb.server.xmlrpcclient | ||
12 | import prserv | 10 | import prserv |
13 | import prserv.db | 11 | import prserv.db |
14 | import errno | 12 | import errno |
15 | import multiprocessing | 13 | import bb.asyncrpc |
16 | 14 | ||
17 | logger = logging.getLogger("BitBake.PRserv") | 15 | logger = logging.getLogger("BitBake.PRserv") |
18 | 16 | ||
19 | class Handler(SimpleXMLRPCRequestHandler): | ||
20 | def _dispatch(self,method,params): | ||
21 | try: | ||
22 | value=self.server.funcs[method](*params) | ||
23 | except: | ||
24 | import traceback | ||
25 | traceback.print_exc() | ||
26 | raise | ||
27 | return value | ||
28 | |||
29 | PIDPREFIX = "/tmp/PRServer_%s_%s.pid" | 17 | PIDPREFIX = "/tmp/PRServer_%s_%s.pid" |
30 | singleton = None | 18 | singleton = None |
31 | 19 | ||
20 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): | ||
21 | def __init__(self, reader, writer, table): | ||
22 | super().__init__(reader, writer, 'PRSERVICE', logger) | ||
23 | self.handlers.update({ | ||
24 | 'get-pr': self.handle_get_pr, | ||
25 | 'import-one': self.handle_import_one, | ||
26 | 'export': self.handle_export, | ||
27 | }) | ||
28 | self.table = table | ||
32 | 29 | ||
33 | class PRServer(SimpleXMLRPCServer): | 30 | def validate_proto_version(self): |
34 | def __init__(self, dbfile, logfile, interface): | 31 | return (self.proto_version == (1, 0)) |
35 | ''' constructor ''' | ||
36 | try: | ||
37 | SimpleXMLRPCServer.__init__(self, interface, | ||
38 | logRequests=False, allow_none=True) | ||
39 | except socket.error: | ||
40 | ip=socket.gethostbyname(interface[0]) | ||
41 | port=interface[1] | ||
42 | msg="PR Server unable to bind to %s:%s\n" % (ip, port) | ||
43 | sys.stderr.write(msg) | ||
44 | raise PRServiceConfigError | ||
45 | |||
46 | self.dbfile=dbfile | ||
47 | self.logfile=logfile | ||
48 | self.host, self.port = self.socket.getsockname() | ||
49 | 32 | ||
50 | self.register_function(self.getPR, "getPR") | 33 | async def dispatch_message(self, msg): |
51 | self.register_function(self.ping, "ping") | ||
52 | self.register_function(self.export, "export") | ||
53 | self.register_function(self.importone, "importone") | ||
54 | self.register_introspection_functions() | ||
55 | |||
56 | self.iter_count = 0 | ||
57 | # 60 iterations between syncs or sync if dirty every ~30 seconds | ||
58 | self.iterations_between_sync = 60 | ||
59 | |||
60 | def sigint_handler(self, signum, stack): | ||
61 | if self.table: | ||
62 | self.table.sync() | ||
63 | |||
64 | def sigterm_handler(self, signum, stack): | ||
65 | if self.table: | ||
66 | self.table.sync() | ||
67 | raise(SystemExit) | ||
68 | |||
69 | def process_request(self, request, client_address): | ||
70 | if request is None: | ||
71 | return | ||
72 | try: | 34 | try: |
73 | self.finish_request(request, client_address) | 35 | await super().dispatch_message(msg) |
74 | self.shutdown_request(request) | ||
75 | self.iter_count = (self.iter_count + 1) % self.iterations_between_sync | ||
76 | if self.iter_count == 0: | ||
77 | self.table.sync_if_dirty() | ||
78 | except: | 36 | except: |
79 | self.handle_error(request, client_address) | ||
80 | self.shutdown_request(request) | ||
81 | self.table.sync() | 37 | self.table.sync() |
82 | self.table.sync_if_dirty() | 38 | raise |
83 | 39 | ||
84 | def serve_forever(self, poll_interval=0.5): | 40 | self.table.sync_if_dirty() |
85 | signal.signal(signal.SIGINT, self.sigint_handler) | ||
86 | signal.signal(signal.SIGTERM, self.sigterm_handler) | ||
87 | 41 | ||
88 | self.db = prserv.db.PRData(self.dbfile) | 42 | async def handle_get_pr(self, request): |
89 | self.table = self.db["PRMAIN"] | 43 | version = request['version'] |
90 | return super().serve_forever(poll_interval) | 44 | pkgarch = request['pkgarch'] |
45 | checksum = request['checksum'] | ||
91 | 46 | ||
92 | def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): | 47 | response = None |
93 | try: | 48 | try: |
94 | return self.table.export(version, pkgarch, checksum, colinfo) | 49 | value = self.table.getValue(version, pkgarch, checksum) |
50 | response = {'value': value} | ||
51 | except prserv.NotFoundError: | ||
52 | logger.error("can not find value for (%s, %s)",version, checksum) | ||
95 | except sqlite3.Error as exc: | 53 | except sqlite3.Error as exc: |
96 | logger.error(str(exc)) | 54 | logger.error(str(exc)) |
97 | return None | ||
98 | 55 | ||
99 | def importone(self, version, pkgarch, checksum, value): | 56 | self.write_message(response) |
100 | return self.table.importone(version, pkgarch, checksum, value) | ||
101 | 57 | ||
102 | def ping(self): | 58 | async def handle_import_one(self, request): |
103 | return True | 59 | version = request['version'] |
60 | pkgarch = request['pkgarch'] | ||
61 | checksum = request['checksum'] | ||
62 | value = request['value'] | ||
63 | |||
64 | value = self.table.importone(version, pkgarch, checksum, value) | ||
65 | if value is not None: | ||
66 | response = {'value': value} | ||
67 | else: | ||
68 | response = None | ||
69 | self.write_message(response) | ||
104 | 70 | ||
105 | def getinfo(self): | 71 | async def handle_export(self, request): |
106 | return (self.host, self.port) | 72 | version = request['version'] |
73 | pkgarch = request['pkgarch'] | ||
74 | checksum = request['checksum'] | ||
75 | colinfo = request['colinfo'] | ||
107 | 76 | ||
108 | def getPR(self, version, pkgarch, checksum): | ||
109 | try: | 77 | try: |
110 | return self.table.getValue(version, pkgarch, checksum) | 78 | (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) |
111 | except prserv.NotFoundError: | ||
112 | logger.error("can not find value for (%s, %s)",version, checksum) | ||
113 | return None | ||
114 | except sqlite3.Error as exc: | 79 | except sqlite3.Error as exc: |
115 | logger.error(str(exc)) | 80 | logger.error(str(exc)) |
116 | return None | 81 | metainfo = datainfo = None |
117 | 82 | ||
118 | class PRServSingleton(object): | 83 | response = {'metainfo': metainfo, 'datainfo': datainfo} |
119 | def __init__(self, dbfile, logfile, interface): | 84 | self.write_message(response) |
85 | |||
86 | class PRServer(bb.asyncrpc.AsyncServer): | ||
87 | def __init__(self, dbfile): | ||
88 | super().__init__(logger) | ||
120 | self.dbfile = dbfile | 89 | self.dbfile = dbfile |
121 | self.logfile = logfile | 90 | self.table = None |
122 | self.interface = interface | ||
123 | self.host = None | ||
124 | self.port = None | ||
125 | 91 | ||
126 | def start(self): | 92 | def accept_client(self, reader, writer): |
127 | self.prserv = PRServer(self.dbfile, self.logfile, self.interface) | 93 | return PRServerClient(reader, writer, self.table) |
128 | self.process = multiprocessing.Process(target=self.prserv.serve_forever) | ||
129 | self.process.start() | ||
130 | 94 | ||
131 | self.host, self.port = self.prserv.getinfo() | 95 | def _serve_forever(self): |
96 | self.db = prserv.db.PRData(self.dbfile) | ||
97 | self.table = self.db["PRMAIN"] | ||
132 | 98 | ||
133 | def getinfo(self): | 99 | logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" % |
134 | return (self.host, self.port) | 100 | (self.dbfile, self.address, str(os.getpid()))) |
135 | 101 | ||
136 | class PRServerConnection(object): | 102 | super()._serve_forever() |
137 | def __init__(self, host, port): | ||
138 | if is_local_special(host, port): | ||
139 | host, port = singleton.getinfo() | ||
140 | self.host = host | ||
141 | self.port = port | ||
142 | self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) | ||
143 | 103 | ||
144 | def getPR(self, version, pkgarch, checksum): | 104 | self.table.sync_if_dirty() |
145 | return self.connection.getPR(version, pkgarch, checksum) | 105 | self.db.disconnect() |
146 | 106 | ||
147 | def ping(self): | 107 | def signal_handler(self): |
148 | return self.connection.ping() | 108 | super().signal_handler() |
109 | if self.table: | ||
110 | self.table.sync() | ||
149 | 111 | ||
150 | def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): | 112 | class PRServSingleton(object): |
151 | return self.connection.export(version, pkgarch, checksum, colinfo) | 113 | def __init__(self, dbfile, logfile, host, port): |
114 | self.dbfile = dbfile | ||
115 | self.logfile = logfile | ||
116 | self.host = host | ||
117 | self.port = port | ||
152 | 118 | ||
153 | def importone(self, version, pkgarch, checksum, value): | 119 | def start(self): |
154 | return self.connection.importone(version, pkgarch, checksum, value) | 120 | self.prserv = PRServer(self.dbfile) |
121 | self.prserv.start_tcp_server(self.host, self.port) | ||
122 | self.process = self.prserv.serve_as_process() | ||
155 | 123 | ||
156 | def getinfo(self): | 124 | if not self.port: |
157 | return self.host, self.port | 125 | self.port = int(self.prserv.address.rsplit(':', 1)[1]) |
158 | 126 | ||
159 | def run_as_daemon(func, pidfile, logfile): | 127 | def run_as_daemon(func, pidfile, logfile): |
160 | """ | 128 | """ |
@@ -240,15 +208,13 @@ def start_daemon(dbfile, host, port, logfile): | |||
240 | % pidfile) | 208 | % pidfile) |
241 | return 1 | 209 | return 1 |
242 | 210 | ||
243 | server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) | 211 | dbfile = os.path.abspath(dbfile) |
244 | run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) | 212 | def daemon_main(): |
213 | server = PRServer(dbfile) | ||
214 | server.start_tcp_server(host, port) | ||
215 | server.serve_forever() | ||
245 | 216 | ||
246 | # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with | 217 | run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) |
247 | # the one the server actually is listening, so at least warn the user about it | ||
248 | _,rport = server.getinfo() | ||
249 | if port != rport: | ||
250 | sys.stdout.write("Server is listening at port %s instead of %s\n" | ||
251 | % (rport,port)) | ||
252 | return 0 | 218 | return 0 |
253 | 219 | ||
254 | def stop_daemon(host, port): | 220 | def stop_daemon(host, port): |
@@ -302,7 +268,7 @@ def is_running(pid): | |||
302 | return True | 268 | return True |
303 | 269 | ||
304 | def is_local_special(host, port): | 270 | def is_local_special(host, port): |
305 | if host.strip().upper() == 'localhost'.upper() and (not port): | 271 | if host.strip().lower() == 'localhost' and not port: |
306 | return True | 272 | return True |
307 | else: | 273 | else: |
308 | return False | 274 | return False |
@@ -340,20 +306,19 @@ def auto_start(d): | |||
340 | auto_shutdown() | 306 | auto_shutdown() |
341 | if not singleton: | 307 | if not singleton: |
342 | bb.utils.mkdirhier(cachedir) | 308 | bb.utils.mkdirhier(cachedir) |
343 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) | 309 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0) |
344 | singleton.start() | 310 | singleton.start() |
345 | if singleton: | 311 | if singleton: |
346 | host, port = singleton.getinfo() | 312 | host = singleton.host |
313 | port = singleton.port | ||
347 | else: | 314 | else: |
348 | host = host_params[0] | 315 | host = host_params[0] |
349 | port = int(host_params[1]) | 316 | port = int(host_params[1]) |
350 | 317 | ||
351 | try: | 318 | try: |
352 | connection = PRServerConnection(host,port) | 319 | ping(host, port) |
353 | connection.ping() | 320 | return str(host) + ":" + str(port) |
354 | realhost, realport = connection.getinfo() | 321 | |
355 | return str(realhost) + ":" + str(realport) | ||
356 | |||
357 | except Exception: | 322 | except Exception: |
358 | logger.critical("PRservice %s:%d not available" % (host, port)) | 323 | logger.critical("PRservice %s:%d not available" % (host, port)) |
359 | raise PRServiceConfigError | 324 | raise PRServiceConfigError |
@@ -366,8 +331,21 @@ def auto_shutdown(): | |||
366 | singleton = None | 331 | singleton = None |
367 | 332 | ||
368 | def ping(host, port): | 333 | def ping(host, port): |
369 | conn=PRServerConnection(host, port) | 334 | from . import client |
335 | |||
336 | conn = client.PRClient() | ||
337 | conn.connect_tcp(host, port) | ||
370 | return conn.ping() | 338 | return conn.ping() |
371 | 339 | ||
372 | def connect(host, port): | 340 | def connect(host, port): |
373 | return PRServerConnection(host, port) | 341 | from . import client |
342 | |||
343 | global singleton | ||
344 | |||
345 | if host.strip().lower() == 'localhost' and not port: | ||
346 | host = 'localhost' | ||
347 | port = singleton.port | ||
348 | |||
349 | conn = client.PRClient() | ||
350 | conn.connect_tcp(host, port) | ||
351 | return conn | ||