summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/prserv/serv.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
-rw-r--r--bitbake/lib/prserv/serv.py252
1 files changed, 184 insertions, 68 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 5fc8863f70..e175886308 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -12,6 +12,7 @@ import sqlite3
12import prserv 12import prserv
13import prserv.db 13import prserv.db
14import errno 14import errno
15from . import create_async_client, revision_smaller, increase_revision
15import bb.asyncrpc 16import bb.asyncrpc
16 17
17logger = logging.getLogger("BitBake.PRserv") 18logger = logging.getLogger("BitBake.PRserv")
@@ -20,16 +21,19 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
20singleton = None 21singleton = None
21 22
22class PRServerClient(bb.asyncrpc.AsyncServerConnection): 23class PRServerClient(bb.asyncrpc.AsyncServerConnection):
23 def __init__(self, socket, table, read_only): 24 def __init__(self, socket, server):
24 super().__init__(socket, 'PRSERVICE', logger) 25 super().__init__(socket, "PRSERVICE", server.logger)
26 self.server = server
27
25 self.handlers.update({ 28 self.handlers.update({
26 'get-pr': self.handle_get_pr, 29 "get-pr": self.handle_get_pr,
27 'import-one': self.handle_import_one, 30 "test-pr": self.handle_test_pr,
28 'export': self.handle_export, 31 "test-package": self.handle_test_package,
29 'is-readonly': self.handle_is_readonly, 32 "max-package-pr": self.handle_max_package_pr,
33 "import-one": self.handle_import_one,
34 "export": self.handle_export,
35 "is-readonly": self.handle_is_readonly,
30 }) 36 })
31 self.table = table
32 self.read_only = read_only
33 37
34 def validate_proto_version(self): 38 def validate_proto_version(self):
35 return (self.proto_version == (1, 0)) 39 return (self.proto_version == (1, 0))
@@ -38,104 +42,213 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
38 try: 42 try:
39 return await super().dispatch_message(msg) 43 return await super().dispatch_message(msg)
40 except: 44 except:
41 self.table.sync()
42 raise 45 raise
43 else: 46
44 self.table.sync_if_dirty() 47 async def handle_test_pr(self, request):
48 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
49 version = request["version"]
50 pkgarch = request["pkgarch"]
51 checksum = request["checksum"]
52 history = request["history"]
53
54 value = self.server.table.find_value(version, pkgarch, checksum, history)
55 return {"value": value}
56
57 async def handle_test_package(self, request):
58 '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False'''
59 version = request["version"]
60 pkgarch = request["pkgarch"]
61
62 value = self.server.table.test_package(version, pkgarch)
63 return {"value": value}
64
65 async def handle_max_package_pr(self, request):
66 '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found'''
67 version = request["version"]
68 pkgarch = request["pkgarch"]
69
70 value = self.server.table.find_package_max_value(version, pkgarch)
71 return {"value": value}
45 72
46 async def handle_get_pr(self, request): 73 async def handle_get_pr(self, request):
47 version = request['version'] 74 version = request["version"]
48 pkgarch = request['pkgarch'] 75 pkgarch = request["pkgarch"]
49 checksum = request['checksum'] 76 checksum = request["checksum"]
77 history = request["history"]
50 78
51 response = None 79 if self.upstream_client is None:
52 try: 80 value = self.server.table.get_value(version, pkgarch, checksum, history)
53 value = self.table.getValue(version, pkgarch, checksum) 81 return {"value": value}
54 response = {'value': value}
55 except prserv.NotFoundError:
56 logger.error("can not find value for (%s, %s)",version, checksum)
57 except sqlite3.Error as exc:
58 logger.error(str(exc))
59 82
60 return response 83 # We have an upstream server.
84 # Check whether the local server already knows the requested configuration.
85 # If the configuration is a new one, the generated value we will add will
86 # depend on what's on the upstream server. That's why we're calling find_value()
87 # instead of get_value() directly.
88
89 value = self.server.table.find_value(version, pkgarch, checksum, history)
90 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
91
92 if value is not None:
93
94 # The configuration is already known locally.
95
96 if history:
97 value = self.server.table.get_value(version, pkgarch, checksum, history)
98 else:
99 existing_value = value
100 # In "no history", we need to make sure the value doesn't decrease
101 # and is at least greater than the maximum upstream value
102 # and the maximum local value
103
104 local_max = self.server.table.find_package_max_value(version, pkgarch)
105 if revision_smaller(value, local_max):
106 value = increase_revision(local_max)
107
108 if revision_smaller(value, upstream_max):
109 # Ask upstream whether it knows the checksum
110 upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
111 if upstream_value is None:
112 # Upstream doesn't have our checksum, let create a new one
113 value = upstream_max + ".0"
114 else:
115 # Fine to take the same value as upstream
116 value = upstream_max
117
118 if not value == existing_value and not self.server.read_only:
119 self.server.table.store_value(version, pkgarch, checksum, value)
120
121 return {"value": value}
122
123 # The configuration is a new one for the local server
124 # Let's ask the upstream server whether it knows it
125
126 known_upstream = await self.upstream_client.test_package(version, pkgarch)
127
128 if not known_upstream:
129
130 # The package is not known upstream, must be a local-only package
131 # Let's compute the PR number using the local-only method
132
133 value = self.server.table.get_value(version, pkgarch, checksum, history)
134 return {"value": value}
135
136 # The package is known upstream, let's ask the upstream server
137 # whether it knows our new output hash
138
139 value = await self.upstream_client.test_pr(version, pkgarch, checksum)
140
141 if value is not None:
142
143 # Upstream knows this output hash, let's store it and use it too.
144
145 if not self.server.read_only:
146 self.server.table.store_value(version, pkgarch, checksum, value)
147 # If the local server is read only, won't be able to store the new
148 # value in the database and will have to keep asking the upstream server
149 return {"value": value}
150
151 # The output hash doesn't exist upstream, get the most recent number from upstream (x)
152 # Then, we want to have a new PR value for the local server: x.y
153
154 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
155 # Here we know that the package is known upstream, so upstream_max can't be None
156 subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
157
158 if not self.server.read_only:
159 self.server.table.store_value(version, pkgarch, checksum, subvalue)
160
161 return {"value": subvalue}
162
163 async def process_requests(self):
164 if self.server.upstream is not None:
165 self.upstream_client = await create_async_client(self.server.upstream)
166 else:
167 self.upstream_client = None
168
169 try:
170 await super().process_requests()
171 finally:
172 if self.upstream_client is not None:
173 await self.upstream_client.close()
61 174
62 async def handle_import_one(self, request): 175 async def handle_import_one(self, request):
63 response = None 176 response = None
64 if not self.read_only: 177 if not self.server.read_only:
65 version = request['version'] 178 version = request["version"]
66 pkgarch = request['pkgarch'] 179 pkgarch = request["pkgarch"]
67 checksum = request['checksum'] 180 checksum = request["checksum"]
68 value = request['value'] 181 value = request["value"]
69 182
70 value = self.table.importone(version, pkgarch, checksum, value) 183 value = self.server.table.importone(version, pkgarch, checksum, value)
71 if value is not None: 184 if value is not None:
72 response = {'value': value} 185 response = {"value": value}
73 186
74 return response 187 return response
75 188
76 async def handle_export(self, request): 189 async def handle_export(self, request):
77 version = request['version'] 190 version = request["version"]
78 pkgarch = request['pkgarch'] 191 pkgarch = request["pkgarch"]
79 checksum = request['checksum'] 192 checksum = request["checksum"]
80 colinfo = request['colinfo'] 193 colinfo = request["colinfo"]
194 history = request["history"]
81 195
82 try: 196 try:
83 (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) 197 (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
84 except sqlite3.Error as exc: 198 except sqlite3.Error as exc:
85 logger.error(str(exc)) 199 self.logger.error(str(exc))
86 metainfo = datainfo = None 200 metainfo = datainfo = None
87 201
88 return {'metainfo': metainfo, 'datainfo': datainfo} 202 return {"metainfo": metainfo, "datainfo": datainfo}
89 203
90 async def handle_is_readonly(self, request): 204 async def handle_is_readonly(self, request):
91 return {'readonly': self.read_only} 205 return {"readonly": self.server.read_only}
92 206
93class PRServer(bb.asyncrpc.AsyncServer): 207class PRServer(bb.asyncrpc.AsyncServer):
94 def __init__(self, dbfile, read_only=False): 208 def __init__(self, dbfile, read_only=False, upstream=None):
95 super().__init__(logger) 209 super().__init__(logger)
96 self.dbfile = dbfile 210 self.dbfile = dbfile
97 self.table = None 211 self.table = None
98 self.read_only = read_only 212 self.read_only = read_only
213 self.upstream = upstream
99 214
100 def accept_client(self, socket): 215 def accept_client(self, socket):
101 return PRServerClient(socket, self.table, self.read_only) 216 return PRServerClient(socket, self)
102 217
103 def start(self): 218 def start(self):
104 tasks = super().start() 219 tasks = super().start()
105 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) 220 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
106 self.table = self.db["PRMAIN"] 221 self.table = self.db["PRMAIN"]
107 222
108 logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 223 self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
109 (self.dbfile, self.address, str(os.getpid()))) 224 (self.dbfile, self.address, str(os.getpid())))
110 225
226 if self.upstream is not None:
227 self.logger.info("And upstream PRServer: %s " % (self.upstream))
228
111 return tasks 229 return tasks
112 230
113 async def stop(self): 231 async def stop(self):
114 self.table.sync_if_dirty()
115 self.db.disconnect() 232 self.db.disconnect()
116 await super().stop() 233 await super().stop()
117 234
118 def signal_handler(self):
119 super().signal_handler()
120 if self.table:
121 self.table.sync()
122
123class PRServSingleton(object): 235class PRServSingleton(object):
124 def __init__(self, dbfile, logfile, host, port): 236 def __init__(self, dbfile, logfile, host, port, upstream):
125 self.dbfile = dbfile 237 self.dbfile = dbfile
126 self.logfile = logfile 238 self.logfile = logfile
127 self.host = host 239 self.host = host
128 self.port = port 240 self.port = port
241 self.upstream = upstream
129 242
130 def start(self): 243 def start(self):
131 self.prserv = PRServer(self.dbfile) 244 self.prserv = PRServer(self.dbfile, upstream=self.upstream)
132 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) 245 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
133 self.process = self.prserv.serve_as_process(log_level=logging.WARNING) 246 self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
134 247
135 if not self.prserv.address: 248 if not self.prserv.address:
136 raise PRServiceConfigError 249 raise PRServiceConfigError
137 if not self.port: 250 if not self.port:
138 self.port = int(self.prserv.address.rsplit(':', 1)[1]) 251 self.port = int(self.prserv.address.rsplit(":", 1)[1])
139 252
140def run_as_daemon(func, pidfile, logfile): 253def run_as_daemon(func, pidfile, logfile):
141 """ 254 """
@@ -171,18 +284,18 @@ def run_as_daemon(func, pidfile, logfile):
171 # stdout/stderr or it could be 'real' unix fd forking where we need 284 # stdout/stderr or it could be 'real' unix fd forking where we need
172 # to physically close the fds to prevent the program launching us from 285 # to physically close the fds to prevent the program launching us from
173 # potentially hanging on a pipe. Handle both cases. 286 # potentially hanging on a pipe. Handle both cases.
174 si = open('/dev/null', 'r') 287 si = open("/dev/null", "r")
175 try: 288 try:
176 os.dup2(si.fileno(),sys.stdin.fileno()) 289 os.dup2(si.fileno(), sys.stdin.fileno())
177 except (AttributeError, io.UnsupportedOperation): 290 except (AttributeError, io.UnsupportedOperation):
178 sys.stdin = si 291 sys.stdin = si
179 so = open(logfile, 'a+') 292 so = open(logfile, "a+")
180 try: 293 try:
181 os.dup2(so.fileno(),sys.stdout.fileno()) 294 os.dup2(so.fileno(), sys.stdout.fileno())
182 except (AttributeError, io.UnsupportedOperation): 295 except (AttributeError, io.UnsupportedOperation):
183 sys.stdout = so 296 sys.stdout = so
184 try: 297 try:
185 os.dup2(so.fileno(),sys.stderr.fileno()) 298 os.dup2(so.fileno(), sys.stderr.fileno())
186 except (AttributeError, io.UnsupportedOperation): 299 except (AttributeError, io.UnsupportedOperation):
187 sys.stderr = so 300 sys.stderr = so
188 301
@@ -200,14 +313,14 @@ def run_as_daemon(func, pidfile, logfile):
200 313
201 # write pidfile 314 # write pidfile
202 pid = str(os.getpid()) 315 pid = str(os.getpid())
203 with open(pidfile, 'w') as pf: 316 with open(pidfile, "w") as pf:
204 pf.write("%s\n" % pid) 317 pf.write("%s\n" % pid)
205 318
206 func() 319 func()
207 os.remove(pidfile) 320 os.remove(pidfile)
208 os._exit(0) 321 os._exit(0)
209 322
210def start_daemon(dbfile, host, port, logfile, read_only=False): 323def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
211 ip = socket.gethostbyname(host) 324 ip = socket.gethostbyname(host)
212 pidfile = PIDPREFIX % (ip, port) 325 pidfile = PIDPREFIX % (ip, port)
213 try: 326 try:
@@ -223,7 +336,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False):
223 336
224 dbfile = os.path.abspath(dbfile) 337 dbfile = os.path.abspath(dbfile)
225 def daemon_main(): 338 def daemon_main():
226 server = PRServer(dbfile, read_only=read_only) 339 server = PRServer(dbfile, read_only=read_only, upstream=upstream)
227 server.start_tcp_server(ip, port) 340 server.start_tcp_server(ip, port)
228 server.serve_forever() 341 server.serve_forever()
229 342
@@ -245,15 +358,15 @@ def stop_daemon(host, port):
245 # so at least advise the user which ports the corresponding server is listening 358 # so at least advise the user which ports the corresponding server is listening
246 ports = [] 359 ports = []
247 portstr = "" 360 portstr = ""
248 for pf in glob.glob(PIDPREFIX % (ip,'*')): 361 for pf in glob.glob(PIDPREFIX % (ip, "*")):
249 bn = os.path.basename(pf) 362 bn = os.path.basename(pf)
250 root, _ = os.path.splitext(bn) 363 root, _ = os.path.splitext(bn)
251 ports.append(root.split('_')[-1]) 364 ports.append(root.split("_")[-1])
252 if len(ports): 365 if len(ports):
253 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) 366 portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports))
254 367
255 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 368 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
256 % (pidfile,portstr)) 369 % (pidfile, portstr))
257 return 1 370 return 1
258 371
259 try: 372 try:
@@ -284,7 +397,7 @@ def is_running(pid):
284 return True 397 return True
285 398
286def is_local_special(host, port): 399def is_local_special(host, port):
287 if (host == 'localhost' or host == '127.0.0.1') and not port: 400 if (host == "localhost" or host == "127.0.0.1") and not port:
288 return True 401 return True
289 else: 402 else:
290 return False 403 return False
@@ -295,7 +408,7 @@ class PRServiceConfigError(Exception):
295def auto_start(d): 408def auto_start(d):
296 global singleton 409 global singleton
297 410
298 host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) 411 host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":")))
299 if not host_params: 412 if not host_params:
300 # Shutdown any existing PR Server 413 # Shutdown any existing PR Server
301 auto_shutdown() 414 auto_shutdown()
@@ -304,12 +417,15 @@ def auto_start(d):
304 if len(host_params) != 2: 417 if len(host_params) != 2:
305 # Shutdown any existing PR Server 418 # Shutdown any existing PR Server
306 auto_shutdown() 419 auto_shutdown()
307 logger.critical('\n'.join(['PRSERV_HOST: incorrect format', 420 logger.critical("\n".join(["PRSERV_HOST: incorrect format",
308 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 421 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
309 raise PRServiceConfigError 422 raise PRServiceConfigError
310 423
311 host = host_params[0].strip().lower() 424 host = host_params[0].strip().lower()
312 port = int(host_params[1]) 425 port = int(host_params[1])
426
427 upstream = d.getVar("PRSERV_UPSTREAM") or None
428
313 if is_local_special(host, port): 429 if is_local_special(host, port):
314 import bb.utils 430 import bb.utils
315 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 431 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
@@ -324,7 +440,7 @@ def auto_start(d):
324 auto_shutdown() 440 auto_shutdown()
325 if not singleton: 441 if not singleton:
326 bb.utils.mkdirhier(cachedir) 442 bb.utils.mkdirhier(cachedir)
327 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) 443 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
328 singleton.start() 444 singleton.start()
329 if singleton: 445 if singleton:
330 host = singleton.host 446 host = singleton.host
@@ -357,8 +473,8 @@ def connect(host, port):
357 473
358 global singleton 474 global singleton
359 475
360 if host.strip().lower() == 'localhost' and not port: 476 if host.strip().lower() == "localhost" and not port:
361 host = 'localhost' 477 host = "localhost"
362 port = singleton.port 478 port = singleton.port
363 479
364 conn = client.PRClient() 480 conn = client.PRClient()