summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/prserv
diff options
context:
space:
mode:
authorPaul Barker <pbarker@konsulko.com>2021-08-19 12:46:43 -0400
committerRichard Purdie <richard.purdie@linuxfoundation.org>2021-08-23 08:30:54 +0100
commitfb3b05fe8da817967c9f90d4c4c0c1fee87c9f01 (patch)
treed6fe2fd8a3c3244932d2dbc393209ed638028855 /bitbake/lib/prserv
parent4df610473f21da79164ed01927103d13240d4c2a (diff)
downloadpoky-fb3b05fe8da817967c9f90d4c4c0c1fee87c9f01.tar.gz
bitbake: prserv: Replace XML RPC with modern asyncrpc implementation
Update the prserv client and server classes to use the modern json and asyncio based RPC system implemented by the asyncrpc module. (Bitbake rev: 6a2b23e27bb61185b8afb382e20ce79f996d9183) Signed-off-by: Paul Barker <pbarker@konsulko.com> [updated for asyncrpc changes, client split to separate file] Signed-off-by: Scott Murray <scott.murray@konsulko.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/prserv')
-rw-r--r--bitbake/lib/prserv/client.py41
-rw-r--r--bitbake/lib/prserv/serv.py234
2 files changed, 147 insertions, 128 deletions
diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py
new file mode 100644
index 0000000000..285dce72f6
--- /dev/null
+++ b/bitbake/lib/prserv/client.py
@@ -0,0 +1,41 @@
1#
2# SPDX-License-Identifier: GPL-2.0-only
3#
4
5import logging
6import bb.asyncrpc
7
8logger = logging.getLogger("BitBake.PRserv")
9
10class PRAsyncClient(bb.asyncrpc.AsyncClient):
11 def __init__(self):
12 super().__init__('PRSERVICE', '1.0', logger)
13
14 async def getPR(self, version, pkgarch, checksum):
15 response = await self.send_message(
16 {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
17 )
18 if response:
19 return response['value']
20
21 async def importone(self, version, pkgarch, checksum, value):
22 response = await self.send_message(
23 {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
24 )
25 if response:
26 return response['value']
27
28 async def export(self, version, pkgarch, checksum, colinfo):
29 response = await self.send_message(
30 {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
31 )
32 if response:
33 return (response['metainfo'], response['datainfo'])
34
35class PRClient(bb.asyncrpc.Client):
36 def __init__(self):
37 super().__init__()
38 self._add_methods('getPR', 'importone', 'export')
39
40 def _get_async_client(self):
41 return PRAsyncClient()
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 5e322bf83d..1fa4e1766c 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -4,157 +4,125 @@
4 4
5import os,sys,logging 5import os,sys,logging
6import signal, time 6import signal, time
7from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
8import socket 7import socket
9import io 8import io
10import sqlite3 9import sqlite3
11import bb.server.xmlrpcclient
12import prserv 10import prserv
13import prserv.db 11import prserv.db
14import errno 12import errno
15import multiprocessing 13import bb.asyncrpc
16 14
17logger = logging.getLogger("BitBake.PRserv") 15logger = logging.getLogger("BitBake.PRserv")
18 16
19class Handler(SimpleXMLRPCRequestHandler):
20 def _dispatch(self,method,params):
21 try:
22 value=self.server.funcs[method](*params)
23 except:
24 import traceback
25 traceback.print_exc()
26 raise
27 return value
28
29PIDPREFIX = "/tmp/PRServer_%s_%s.pid" 17PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
30singleton = None 18singleton = None
31 19
20class PRServerClient(bb.asyncrpc.AsyncServerConnection):
21 def __init__(self, reader, writer, table):
22 super().__init__(reader, writer, 'PRSERVICE', logger)
23 self.handlers.update({
24 'get-pr': self.handle_get_pr,
25 'import-one': self.handle_import_one,
26 'export': self.handle_export,
27 })
28 self.table = table
32 29
33class PRServer(SimpleXMLRPCServer): 30 def validate_proto_version(self):
34 def __init__(self, dbfile, logfile, interface): 31 return (self.proto_version == (1, 0))
35 ''' constructor '''
36 try:
37 SimpleXMLRPCServer.__init__(self, interface,
38 logRequests=False, allow_none=True)
39 except socket.error:
40 ip=socket.gethostbyname(interface[0])
41 port=interface[1]
42 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
43 sys.stderr.write(msg)
44 raise PRServiceConfigError
45
46 self.dbfile=dbfile
47 self.logfile=logfile
48 self.host, self.port = self.socket.getsockname()
49 32
50 self.register_function(self.getPR, "getPR") 33 async def dispatch_message(self, msg):
51 self.register_function(self.ping, "ping")
52 self.register_function(self.export, "export")
53 self.register_function(self.importone, "importone")
54 self.register_introspection_functions()
55
56 self.iter_count = 0
57 # 60 iterations between syncs or sync if dirty every ~30 seconds
58 self.iterations_between_sync = 60
59
60 def sigint_handler(self, signum, stack):
61 if self.table:
62 self.table.sync()
63
64 def sigterm_handler(self, signum, stack):
65 if self.table:
66 self.table.sync()
67 raise(SystemExit)
68
69 def process_request(self, request, client_address):
70 if request is None:
71 return
72 try: 34 try:
73 self.finish_request(request, client_address) 35 await super().dispatch_message(msg)
74 self.shutdown_request(request)
75 self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
76 if self.iter_count == 0:
77 self.table.sync_if_dirty()
78 except: 36 except:
79 self.handle_error(request, client_address)
80 self.shutdown_request(request)
81 self.table.sync() 37 self.table.sync()
82 self.table.sync_if_dirty() 38 raise
83 39
84 def serve_forever(self, poll_interval=0.5): 40 self.table.sync_if_dirty()
85 signal.signal(signal.SIGINT, self.sigint_handler)
86 signal.signal(signal.SIGTERM, self.sigterm_handler)
87 41
88 self.db = prserv.db.PRData(self.dbfile) 42 async def handle_get_pr(self, request):
89 self.table = self.db["PRMAIN"] 43 version = request['version']
90 return super().serve_forever(poll_interval) 44 pkgarch = request['pkgarch']
45 checksum = request['checksum']
91 46
92 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): 47 response = None
93 try: 48 try:
94 return self.table.export(version, pkgarch, checksum, colinfo) 49 value = self.table.getValue(version, pkgarch, checksum)
50 response = {'value': value}
51 except prserv.NotFoundError:
52 logger.error("can not find value for (%s, %s)",version, checksum)
95 except sqlite3.Error as exc: 53 except sqlite3.Error as exc:
96 logger.error(str(exc)) 54 logger.error(str(exc))
97 return None
98 55
99 def importone(self, version, pkgarch, checksum, value): 56 self.write_message(response)
100 return self.table.importone(version, pkgarch, checksum, value)
101 57
102 def ping(self): 58 async def handle_import_one(self, request):
103 return True 59 version = request['version']
60 pkgarch = request['pkgarch']
61 checksum = request['checksum']
62 value = request['value']
63
64 value = self.table.importone(version, pkgarch, checksum, value)
65 if value is not None:
66 response = {'value': value}
67 else:
68 response = None
69 self.write_message(response)
104 70
105 def getinfo(self): 71 async def handle_export(self, request):
106 return (self.host, self.port) 72 version = request['version']
73 pkgarch = request['pkgarch']
74 checksum = request['checksum']
75 colinfo = request['colinfo']
107 76
108 def getPR(self, version, pkgarch, checksum):
109 try: 77 try:
110 return self.table.getValue(version, pkgarch, checksum) 78 (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
111 except prserv.NotFoundError:
112 logger.error("can not find value for (%s, %s)",version, checksum)
113 return None
114 except sqlite3.Error as exc: 79 except sqlite3.Error as exc:
115 logger.error(str(exc)) 80 logger.error(str(exc))
116 return None 81 metainfo = datainfo = None
117 82
118class PRServSingleton(object): 83 response = {'metainfo': metainfo, 'datainfo': datainfo}
119 def __init__(self, dbfile, logfile, interface): 84 self.write_message(response)
85
86class PRServer(bb.asyncrpc.AsyncServer):
87 def __init__(self, dbfile):
88 super().__init__(logger)
120 self.dbfile = dbfile 89 self.dbfile = dbfile
121 self.logfile = logfile 90 self.table = None
122 self.interface = interface
123 self.host = None
124 self.port = None
125 91
126 def start(self): 92 def accept_client(self, reader, writer):
127 self.prserv = PRServer(self.dbfile, self.logfile, self.interface) 93 return PRServerClient(reader, writer, self.table)
128 self.process = multiprocessing.Process(target=self.prserv.serve_forever)
129 self.process.start()
130 94
131 self.host, self.port = self.prserv.getinfo() 95 def _serve_forever(self):
96 self.db = prserv.db.PRData(self.dbfile)
97 self.table = self.db["PRMAIN"]
132 98
133 def getinfo(self): 99 logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
134 return (self.host, self.port) 100 (self.dbfile, self.address, str(os.getpid())))
135 101
136class PRServerConnection(object): 102 super()._serve_forever()
137 def __init__(self, host, port):
138 if is_local_special(host, port):
139 host, port = singleton.getinfo()
140 self.host = host
141 self.port = port
142 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
143 103
144 def getPR(self, version, pkgarch, checksum): 104 self.table.sync_if_dirty()
145 return self.connection.getPR(version, pkgarch, checksum) 105 self.db.disconnect()
146 106
147 def ping(self): 107 def signal_handler(self):
148 return self.connection.ping() 108 super().signal_handler()
109 if self.table:
110 self.table.sync()
149 111
150 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): 112class PRServSingleton(object):
151 return self.connection.export(version, pkgarch, checksum, colinfo) 113 def __init__(self, dbfile, logfile, host, port):
114 self.dbfile = dbfile
115 self.logfile = logfile
116 self.host = host
117 self.port = port
152 118
153 def importone(self, version, pkgarch, checksum, value): 119 def start(self):
154 return self.connection.importone(version, pkgarch, checksum, value) 120 self.prserv = PRServer(self.dbfile)
121 self.prserv.start_tcp_server(self.host, self.port)
122 self.process = self.prserv.serve_as_process()
155 123
156 def getinfo(self): 124 if not self.port:
157 return self.host, self.port 125 self.port = int(self.prserv.address.rsplit(':', 1)[1])
158 126
159def run_as_daemon(func, pidfile, logfile): 127def run_as_daemon(func, pidfile, logfile):
160 """ 128 """
@@ -240,15 +208,13 @@ def start_daemon(dbfile, host, port, logfile):
240 % pidfile) 208 % pidfile)
241 return 1 209 return 1
242 210
243 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) 211 dbfile = os.path.abspath(dbfile)
244 run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) 212 def daemon_main():
213 server = PRServer(dbfile)
214 server.start_tcp_server(host, port)
215 server.serve_forever()
245 216
246 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with 217 run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
247 # the one the server actually is listening, so at least warn the user about it
248 _,rport = server.getinfo()
249 if port != rport:
250 sys.stdout.write("Server is listening at port %s instead of %s\n"
251 % (rport,port))
252 return 0 218 return 0
253 219
254def stop_daemon(host, port): 220def stop_daemon(host, port):
@@ -302,7 +268,7 @@ def is_running(pid):
302 return True 268 return True
303 269
304def is_local_special(host, port): 270def is_local_special(host, port):
305 if host.strip().upper() == 'localhost'.upper() and (not port): 271 if host.strip().lower() == 'localhost' and not port:
306 return True 272 return True
307 else: 273 else:
308 return False 274 return False
@@ -340,20 +306,19 @@ def auto_start(d):
340 auto_shutdown() 306 auto_shutdown()
341 if not singleton: 307 if not singleton:
342 bb.utils.mkdirhier(cachedir) 308 bb.utils.mkdirhier(cachedir)
343 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) 309 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0)
344 singleton.start() 310 singleton.start()
345 if singleton: 311 if singleton:
346 host, port = singleton.getinfo() 312 host = singleton.host
313 port = singleton.port
347 else: 314 else:
348 host = host_params[0] 315 host = host_params[0]
349 port = int(host_params[1]) 316 port = int(host_params[1])
350 317
351 try: 318 try:
352 connection = PRServerConnection(host,port) 319 ping(host, port)
353 connection.ping() 320 return str(host) + ":" + str(port)
354 realhost, realport = connection.getinfo() 321
355 return str(realhost) + ":" + str(realport)
356
357 except Exception: 322 except Exception:
358 logger.critical("PRservice %s:%d not available" % (host, port)) 323 logger.critical("PRservice %s:%d not available" % (host, port))
359 raise PRServiceConfigError 324 raise PRServiceConfigError
@@ -366,8 +331,21 @@ def auto_shutdown():
366 singleton = None 331 singleton = None
367 332
368def ping(host, port): 333def ping(host, port):
369 conn=PRServerConnection(host, port) 334 from . import client
335
336 conn = client.PRClient()
337 conn.connect_tcp(host, port)
370 return conn.ping() 338 return conn.ping()
371 339
372def connect(host, port): 340def connect(host, port):
373 return PRServerConnection(host, port) 341 from . import client
342
343 global singleton
344
345 if host.strip().lower() == 'localhost' and not port:
346 host = 'localhost'
347 port = singleton.port
348
349 conn = client.PRClient()
350 conn.connect_tcp(host, port)
351 return conn