diff options
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
-rw-r--r-- | bitbake/lib/prserv/serv.py | 252 |
1 files changed, 184 insertions, 68 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index 5fc8863f70..e175886308 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
@@ -12,6 +12,7 @@ import sqlite3 | |||
12 | import prserv | 12 | import prserv |
13 | import prserv.db | 13 | import prserv.db |
14 | import errno | 14 | import errno |
15 | from . import create_async_client, revision_smaller, increase_revision | ||
15 | import bb.asyncrpc | 16 | import bb.asyncrpc |
16 | 17 | ||
17 | logger = logging.getLogger("BitBake.PRserv") | 18 | logger = logging.getLogger("BitBake.PRserv") |
@@ -20,16 +21,19 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid" | |||
20 | singleton = None | 21 | singleton = None |
21 | 22 | ||
22 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): | 23 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): |
23 | def __init__(self, socket, table, read_only): | 24 | def __init__(self, socket, server): |
24 | super().__init__(socket, 'PRSERVICE', logger) | 25 | super().__init__(socket, "PRSERVICE", server.logger) |
26 | self.server = server | ||
27 | |||
25 | self.handlers.update({ | 28 | self.handlers.update({ |
26 | 'get-pr': self.handle_get_pr, | 29 | "get-pr": self.handle_get_pr, |
27 | 'import-one': self.handle_import_one, | 30 | "test-pr": self.handle_test_pr, |
28 | 'export': self.handle_export, | 31 | "test-package": self.handle_test_package, |
29 | 'is-readonly': self.handle_is_readonly, | 32 | "max-package-pr": self.handle_max_package_pr, |
33 | "import-one": self.handle_import_one, | ||
34 | "export": self.handle_export, | ||
35 | "is-readonly": self.handle_is_readonly, | ||
30 | }) | 36 | }) |
31 | self.table = table | ||
32 | self.read_only = read_only | ||
33 | 37 | ||
34 | def validate_proto_version(self): | 38 | def validate_proto_version(self): |
35 | return (self.proto_version == (1, 0)) | 39 | return (self.proto_version == (1, 0)) |
@@ -38,104 +42,213 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
38 | try: | 42 | try: |
39 | return await super().dispatch_message(msg) | 43 | return await super().dispatch_message(msg) |
40 | except: | 44 | except: |
41 | self.table.sync() | ||
42 | raise | 45 | raise |
43 | else: | 46 | |
44 | self.table.sync_if_dirty() | 47 | async def handle_test_pr(self, request): |
48 | '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' | ||
49 | version = request["version"] | ||
50 | pkgarch = request["pkgarch"] | ||
51 | checksum = request["checksum"] | ||
52 | history = request["history"] | ||
53 | |||
54 | value = self.server.table.find_value(version, pkgarch, checksum, history) | ||
55 | return {"value": value} | ||
56 | |||
57 | async def handle_test_package(self, request): | ||
58 | '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False''' | ||
59 | version = request["version"] | ||
60 | pkgarch = request["pkgarch"] | ||
61 | |||
62 | value = self.server.table.test_package(version, pkgarch) | ||
63 | return {"value": value} | ||
64 | |||
65 | async def handle_max_package_pr(self, request): | ||
66 | '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found''' | ||
67 | version = request["version"] | ||
68 | pkgarch = request["pkgarch"] | ||
69 | |||
70 | value = self.server.table.find_package_max_value(version, pkgarch) | ||
71 | return {"value": value} | ||
45 | 72 | ||
46 | async def handle_get_pr(self, request): | 73 | async def handle_get_pr(self, request): |
47 | version = request['version'] | 74 | version = request["version"] |
48 | pkgarch = request['pkgarch'] | 75 | pkgarch = request["pkgarch"] |
49 | checksum = request['checksum'] | 76 | checksum = request["checksum"] |
77 | history = request["history"] | ||
50 | 78 | ||
51 | response = None | 79 | if self.upstream_client is None: |
52 | try: | 80 | value = self.server.table.get_value(version, pkgarch, checksum, history) |
53 | value = self.table.getValue(version, pkgarch, checksum) | 81 | return {"value": value} |
54 | response = {'value': value} | ||
55 | except prserv.NotFoundError: | ||
56 | logger.error("can not find value for (%s, %s)",version, checksum) | ||
57 | except sqlite3.Error as exc: | ||
58 | logger.error(str(exc)) | ||
59 | 82 | ||
60 | return response | 83 | # We have an upstream server. |
84 | # Check whether the local server already knows the requested configuration. | ||
85 | # If the configuration is a new one, the generated value we will add will | ||
86 | # depend on what's on the upstream server. That's why we're calling find_value() | ||
87 | # instead of get_value() directly. | ||
88 | |||
89 | value = self.server.table.find_value(version, pkgarch, checksum, history) | ||
90 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
91 | |||
92 | if value is not None: | ||
93 | |||
94 | # The configuration is already known locally. | ||
95 | |||
96 | if history: | ||
97 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
98 | else: | ||
99 | existing_value = value | ||
100 | # In "no history", we need to make sure the value doesn't decrease | ||
101 | # and is at least greater than the maximum upstream value | ||
102 | # and the maximum local value | ||
103 | |||
104 | local_max = self.server.table.find_package_max_value(version, pkgarch) | ||
105 | if revision_smaller(value, local_max): | ||
106 | value = increase_revision(local_max) | ||
107 | |||
108 | if revision_smaller(value, upstream_max): | ||
109 | # Ask upstream whether it knows the checksum | ||
110 | upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
111 | if upstream_value is None: | ||
112 | # Upstream doesn't have our checksum, let create a new one | ||
113 | value = upstream_max + ".0" | ||
114 | else: | ||
115 | # Fine to take the same value as upstream | ||
116 | value = upstream_max | ||
117 | |||
118 | if not value == existing_value and not self.server.read_only: | ||
119 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
120 | |||
121 | return {"value": value} | ||
122 | |||
123 | # The configuration is a new one for the local server | ||
124 | # Let's ask the upstream server whether it knows it | ||
125 | |||
126 | known_upstream = await self.upstream_client.test_package(version, pkgarch) | ||
127 | |||
128 | if not known_upstream: | ||
129 | |||
130 | # The package is not known upstream, must be a local-only package | ||
131 | # Let's compute the PR number using the local-only method | ||
132 | |||
133 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
134 | return {"value": value} | ||
135 | |||
136 | # The package is known upstream, let's ask the upstream server | ||
137 | # whether it knows our new output hash | ||
138 | |||
139 | value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
140 | |||
141 | if value is not None: | ||
142 | |||
143 | # Upstream knows this output hash, let's store it and use it too. | ||
144 | |||
145 | if not self.server.read_only: | ||
146 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
147 | # If the local server is read only, won't be able to store the new | ||
148 | # value in the database and will have to keep asking the upstream server | ||
149 | return {"value": value} | ||
150 | |||
151 | # The output hash doesn't exist upstream, get the most recent number from upstream (x) | ||
152 | # Then, we want to have a new PR value for the local server: x.y | ||
153 | |||
154 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
155 | # Here we know that the package is known upstream, so upstream_max can't be None | ||
156 | subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max) | ||
157 | |||
158 | if not self.server.read_only: | ||
159 | self.server.table.store_value(version, pkgarch, checksum, subvalue) | ||
160 | |||
161 | return {"value": subvalue} | ||
162 | |||
163 | async def process_requests(self): | ||
164 | if self.server.upstream is not None: | ||
165 | self.upstream_client = await create_async_client(self.server.upstream) | ||
166 | else: | ||
167 | self.upstream_client = None | ||
168 | |||
169 | try: | ||
170 | await super().process_requests() | ||
171 | finally: | ||
172 | if self.upstream_client is not None: | ||
173 | await self.upstream_client.close() | ||
61 | 174 | ||
62 | async def handle_import_one(self, request): | 175 | async def handle_import_one(self, request): |
63 | response = None | 176 | response = None |
64 | if not self.read_only: | 177 | if not self.server.read_only: |
65 | version = request['version'] | 178 | version = request["version"] |
66 | pkgarch = request['pkgarch'] | 179 | pkgarch = request["pkgarch"] |
67 | checksum = request['checksum'] | 180 | checksum = request["checksum"] |
68 | value = request['value'] | 181 | value = request["value"] |
69 | 182 | ||
70 | value = self.table.importone(version, pkgarch, checksum, value) | 183 | value = self.server.table.importone(version, pkgarch, checksum, value) |
71 | if value is not None: | 184 | if value is not None: |
72 | response = {'value': value} | 185 | response = {"value": value} |
73 | 186 | ||
74 | return response | 187 | return response |
75 | 188 | ||
76 | async def handle_export(self, request): | 189 | async def handle_export(self, request): |
77 | version = request['version'] | 190 | version = request["version"] |
78 | pkgarch = request['pkgarch'] | 191 | pkgarch = request["pkgarch"] |
79 | checksum = request['checksum'] | 192 | checksum = request["checksum"] |
80 | colinfo = request['colinfo'] | 193 | colinfo = request["colinfo"] |
194 | history = request["history"] | ||
81 | 195 | ||
82 | try: | 196 | try: |
83 | (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) | 197 | (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history) |
84 | except sqlite3.Error as exc: | 198 | except sqlite3.Error as exc: |
85 | logger.error(str(exc)) | 199 | self.logger.error(str(exc)) |
86 | metainfo = datainfo = None | 200 | metainfo = datainfo = None |
87 | 201 | ||
88 | return {'metainfo': metainfo, 'datainfo': datainfo} | 202 | return {"metainfo": metainfo, "datainfo": datainfo} |
89 | 203 | ||
90 | async def handle_is_readonly(self, request): | 204 | async def handle_is_readonly(self, request): |
91 | return {'readonly': self.read_only} | 205 | return {"readonly": self.server.read_only} |
92 | 206 | ||
93 | class PRServer(bb.asyncrpc.AsyncServer): | 207 | class PRServer(bb.asyncrpc.AsyncServer): |
94 | def __init__(self, dbfile, read_only=False): | 208 | def __init__(self, dbfile, read_only=False, upstream=None): |
95 | super().__init__(logger) | 209 | super().__init__(logger) |
96 | self.dbfile = dbfile | 210 | self.dbfile = dbfile |
97 | self.table = None | 211 | self.table = None |
98 | self.read_only = read_only | 212 | self.read_only = read_only |
213 | self.upstream = upstream | ||
99 | 214 | ||
100 | def accept_client(self, socket): | 215 | def accept_client(self, socket): |
101 | return PRServerClient(socket, self.table, self.read_only) | 216 | return PRServerClient(socket, self) |
102 | 217 | ||
103 | def start(self): | 218 | def start(self): |
104 | tasks = super().start() | 219 | tasks = super().start() |
105 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) | 220 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) |
106 | self.table = self.db["PRMAIN"] | 221 | self.table = self.db["PRMAIN"] |
107 | 222 | ||
108 | logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 223 | self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % |
109 | (self.dbfile, self.address, str(os.getpid()))) | 224 | (self.dbfile, self.address, str(os.getpid()))) |
110 | 225 | ||
226 | if self.upstream is not None: | ||
227 | self.logger.info("And upstream PRServer: %s " % (self.upstream)) | ||
228 | |||
111 | return tasks | 229 | return tasks |
112 | 230 | ||
113 | async def stop(self): | 231 | async def stop(self): |
114 | self.table.sync_if_dirty() | ||
115 | self.db.disconnect() | 232 | self.db.disconnect() |
116 | await super().stop() | 233 | await super().stop() |
117 | 234 | ||
118 | def signal_handler(self): | ||
119 | super().signal_handler() | ||
120 | if self.table: | ||
121 | self.table.sync() | ||
122 | |||
123 | class PRServSingleton(object): | 235 | class PRServSingleton(object): |
124 | def __init__(self, dbfile, logfile, host, port): | 236 | def __init__(self, dbfile, logfile, host, port, upstream): |
125 | self.dbfile = dbfile | 237 | self.dbfile = dbfile |
126 | self.logfile = logfile | 238 | self.logfile = logfile |
127 | self.host = host | 239 | self.host = host |
128 | self.port = port | 240 | self.port = port |
241 | self.upstream = upstream | ||
129 | 242 | ||
130 | def start(self): | 243 | def start(self): |
131 | self.prserv = PRServer(self.dbfile) | 244 | self.prserv = PRServer(self.dbfile, upstream=self.upstream) |
132 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) | 245 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) |
133 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) | 246 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) |
134 | 247 | ||
135 | if not self.prserv.address: | 248 | if not self.prserv.address: |
136 | raise PRServiceConfigError | 249 | raise PRServiceConfigError |
137 | if not self.port: | 250 | if not self.port: |
138 | self.port = int(self.prserv.address.rsplit(':', 1)[1]) | 251 | self.port = int(self.prserv.address.rsplit(":", 1)[1]) |
139 | 252 | ||
140 | def run_as_daemon(func, pidfile, logfile): | 253 | def run_as_daemon(func, pidfile, logfile): |
141 | """ | 254 | """ |
@@ -171,18 +284,18 @@ def run_as_daemon(func, pidfile, logfile): | |||
171 | # stdout/stderr or it could be 'real' unix fd forking where we need | 284 | # stdout/stderr or it could be 'real' unix fd forking where we need |
172 | # to physically close the fds to prevent the program launching us from | 285 | # to physically close the fds to prevent the program launching us from |
173 | # potentially hanging on a pipe. Handle both cases. | 286 | # potentially hanging on a pipe. Handle both cases. |
174 | si = open('/dev/null', 'r') | 287 | si = open("/dev/null", "r") |
175 | try: | 288 | try: |
176 | os.dup2(si.fileno(),sys.stdin.fileno()) | 289 | os.dup2(si.fileno(), sys.stdin.fileno()) |
177 | except (AttributeError, io.UnsupportedOperation): | 290 | except (AttributeError, io.UnsupportedOperation): |
178 | sys.stdin = si | 291 | sys.stdin = si |
179 | so = open(logfile, 'a+') | 292 | so = open(logfile, "a+") |
180 | try: | 293 | try: |
181 | os.dup2(so.fileno(),sys.stdout.fileno()) | 294 | os.dup2(so.fileno(), sys.stdout.fileno()) |
182 | except (AttributeError, io.UnsupportedOperation): | 295 | except (AttributeError, io.UnsupportedOperation): |
183 | sys.stdout = so | 296 | sys.stdout = so |
184 | try: | 297 | try: |
185 | os.dup2(so.fileno(),sys.stderr.fileno()) | 298 | os.dup2(so.fileno(), sys.stderr.fileno()) |
186 | except (AttributeError, io.UnsupportedOperation): | 299 | except (AttributeError, io.UnsupportedOperation): |
187 | sys.stderr = so | 300 | sys.stderr = so |
188 | 301 | ||
@@ -200,14 +313,14 @@ def run_as_daemon(func, pidfile, logfile): | |||
200 | 313 | ||
201 | # write pidfile | 314 | # write pidfile |
202 | pid = str(os.getpid()) | 315 | pid = str(os.getpid()) |
203 | with open(pidfile, 'w') as pf: | 316 | with open(pidfile, "w") as pf: |
204 | pf.write("%s\n" % pid) | 317 | pf.write("%s\n" % pid) |
205 | 318 | ||
206 | func() | 319 | func() |
207 | os.remove(pidfile) | 320 | os.remove(pidfile) |
208 | os._exit(0) | 321 | os._exit(0) |
209 | 322 | ||
210 | def start_daemon(dbfile, host, port, logfile, read_only=False): | 323 | def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None): |
211 | ip = socket.gethostbyname(host) | 324 | ip = socket.gethostbyname(host) |
212 | pidfile = PIDPREFIX % (ip, port) | 325 | pidfile = PIDPREFIX % (ip, port) |
213 | try: | 326 | try: |
@@ -223,7 +336,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False): | |||
223 | 336 | ||
224 | dbfile = os.path.abspath(dbfile) | 337 | dbfile = os.path.abspath(dbfile) |
225 | def daemon_main(): | 338 | def daemon_main(): |
226 | server = PRServer(dbfile, read_only=read_only) | 339 | server = PRServer(dbfile, read_only=read_only, upstream=upstream) |
227 | server.start_tcp_server(ip, port) | 340 | server.start_tcp_server(ip, port) |
228 | server.serve_forever() | 341 | server.serve_forever() |
229 | 342 | ||
@@ -245,15 +358,15 @@ def stop_daemon(host, port): | |||
245 | # so at least advise the user which ports the corresponding server is listening | 358 | # so at least advise the user which ports the corresponding server is listening |
246 | ports = [] | 359 | ports = [] |
247 | portstr = "" | 360 | portstr = "" |
248 | for pf in glob.glob(PIDPREFIX % (ip,'*')): | 361 | for pf in glob.glob(PIDPREFIX % (ip, "*")): |
249 | bn = os.path.basename(pf) | 362 | bn = os.path.basename(pf) |
250 | root, _ = os.path.splitext(bn) | 363 | root, _ = os.path.splitext(bn) |
251 | ports.append(root.split('_')[-1]) | 364 | ports.append(root.split("_")[-1]) |
252 | if len(ports): | 365 | if len(ports): |
253 | portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) | 366 | portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports)) |
254 | 367 | ||
255 | sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" | 368 | sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" |
256 | % (pidfile,portstr)) | 369 | % (pidfile, portstr)) |
257 | return 1 | 370 | return 1 |
258 | 371 | ||
259 | try: | 372 | try: |
@@ -284,7 +397,7 @@ def is_running(pid): | |||
284 | return True | 397 | return True |
285 | 398 | ||
286 | def is_local_special(host, port): | 399 | def is_local_special(host, port): |
287 | if (host == 'localhost' or host == '127.0.0.1') and not port: | 400 | if (host == "localhost" or host == "127.0.0.1") and not port: |
288 | return True | 401 | return True |
289 | else: | 402 | else: |
290 | return False | 403 | return False |
@@ -295,7 +408,7 @@ class PRServiceConfigError(Exception): | |||
295 | def auto_start(d): | 408 | def auto_start(d): |
296 | global singleton | 409 | global singleton |
297 | 410 | ||
298 | host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) | 411 | host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":"))) |
299 | if not host_params: | 412 | if not host_params: |
300 | # Shutdown any existing PR Server | 413 | # Shutdown any existing PR Server |
301 | auto_shutdown() | 414 | auto_shutdown() |
@@ -304,12 +417,15 @@ def auto_start(d): | |||
304 | if len(host_params) != 2: | 417 | if len(host_params) != 2: |
305 | # Shutdown any existing PR Server | 418 | # Shutdown any existing PR Server |
306 | auto_shutdown() | 419 | auto_shutdown() |
307 | logger.critical('\n'.join(['PRSERV_HOST: incorrect format', | 420 | logger.critical("\n".join(["PRSERV_HOST: incorrect format", |
308 | 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) | 421 | 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) |
309 | raise PRServiceConfigError | 422 | raise PRServiceConfigError |
310 | 423 | ||
311 | host = host_params[0].strip().lower() | 424 | host = host_params[0].strip().lower() |
312 | port = int(host_params[1]) | 425 | port = int(host_params[1]) |
426 | |||
427 | upstream = d.getVar("PRSERV_UPSTREAM") or None | ||
428 | |||
313 | if is_local_special(host, port): | 429 | if is_local_special(host, port): |
314 | import bb.utils | 430 | import bb.utils |
315 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) | 431 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) |
@@ -324,7 +440,7 @@ def auto_start(d): | |||
324 | auto_shutdown() | 440 | auto_shutdown() |
325 | if not singleton: | 441 | if not singleton: |
326 | bb.utils.mkdirhier(cachedir) | 442 | bb.utils.mkdirhier(cachedir) |
327 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) | 443 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream) |
328 | singleton.start() | 444 | singleton.start() |
329 | if singleton: | 445 | if singleton: |
330 | host = singleton.host | 446 | host = singleton.host |
@@ -357,8 +473,8 @@ def connect(host, port): | |||
357 | 473 | ||
358 | global singleton | 474 | global singleton |
359 | 475 | ||
360 | if host.strip().lower() == 'localhost' and not port: | 476 | if host.strip().lower() == "localhost" and not port: |
361 | host = 'localhost' | 477 | host = "localhost" |
362 | port = singleton.port | 478 | port = singleton.port |
363 | 479 | ||
364 | conn = client.PRClient() | 480 | conn = client.PRClient() |