diff options
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
| -rw-r--r-- | bitbake/lib/prserv/serv.py | 140 | 
1 files changed, 115 insertions, 25 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index dc4be5b620..e175886308 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py  | |||
| @@ -12,6 +12,7 @@ import sqlite3 | |||
| 12 | import prserv | 12 | import prserv | 
| 13 | import prserv.db | 13 | import prserv.db | 
| 14 | import errno | 14 | import errno | 
| 15 | from . import create_async_client, revision_smaller, increase_revision | ||
| 15 | import bb.asyncrpc | 16 | import bb.asyncrpc | 
| 16 | 17 | ||
| 17 | logger = logging.getLogger("BitBake.PRserv") | 18 | logger = logging.getLogger("BitBake.PRserv") | 
| @@ -41,18 +42,16 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 41 | try: | 42 | try: | 
| 42 | return await super().dispatch_message(msg) | 43 | return await super().dispatch_message(msg) | 
| 43 | except: | 44 | except: | 
| 44 | self.server.table.sync() | ||
| 45 | raise | 45 | raise | 
| 46 | else: | ||
| 47 | self.server.table.sync_if_dirty() | ||
| 48 | 46 | ||
| 49 | async def handle_test_pr(self, request): | 47 | async def handle_test_pr(self, request): | 
| 50 | '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' | 48 | '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' | 
| 51 | version = request["version"] | 49 | version = request["version"] | 
| 52 | pkgarch = request["pkgarch"] | 50 | pkgarch = request["pkgarch"] | 
| 53 | checksum = request["checksum"] | 51 | checksum = request["checksum"] | 
| 52 | history = request["history"] | ||
| 54 | 53 | ||
| 55 | value = self.server.table.find_value(version, pkgarch, checksum) | 54 | value = self.server.table.find_value(version, pkgarch, checksum, history) | 
| 56 | return {"value": value} | 55 | return {"value": value} | 
| 57 | 56 | ||
| 58 | async def handle_test_package(self, request): | 57 | async def handle_test_package(self, request): | 
| @@ -68,22 +67,110 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 68 | version = request["version"] | 67 | version = request["version"] | 
| 69 | pkgarch = request["pkgarch"] | 68 | pkgarch = request["pkgarch"] | 
| 70 | 69 | ||
| 71 | value = self.server.table.find_max_value(version, pkgarch) | 70 | value = self.server.table.find_package_max_value(version, pkgarch) | 
| 72 | return {"value": value} | 71 | return {"value": value} | 
| 73 | 72 | ||
| 74 | async def handle_get_pr(self, request): | 73 | async def handle_get_pr(self, request): | 
| 75 | version = request["version"] | 74 | version = request["version"] | 
| 76 | pkgarch = request["pkgarch"] | 75 | pkgarch = request["pkgarch"] | 
| 77 | checksum = request["checksum"] | 76 | checksum = request["checksum"] | 
| 77 | history = request["history"] | ||
| 78 | 78 | ||
| 79 | response = None | 79 | if self.upstream_client is None: | 
| 80 | try: | 80 | value = self.server.table.get_value(version, pkgarch, checksum, history) | 
| 81 | value = self.server.table.get_value(version, pkgarch, checksum) | 81 | return {"value": value} | 
| 82 | response = {"value": value} | ||
| 83 | except prserv.NotFoundError: | ||
| 84 | self.logger.error("failure storing value in database for (%s, %s)",version, checksum) | ||
| 85 | 82 | ||
| 86 | return response | 83 | # We have an upstream server. | 
| 84 | # Check whether the local server already knows the requested configuration. | ||
| 85 | # If the configuration is a new one, the generated value we will add will | ||
| 86 | # depend on what's on the upstream server. That's why we're calling find_value() | ||
| 87 | # instead of get_value() directly. | ||
| 88 | |||
| 89 | value = self.server.table.find_value(version, pkgarch, checksum, history) | ||
| 90 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
| 91 | |||
| 92 | if value is not None: | ||
| 93 | |||
| 94 | # The configuration is already known locally. | ||
| 95 | |||
| 96 | if history: | ||
| 97 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
| 98 | else: | ||
| 99 | existing_value = value | ||
| 100 | # In "no history", we need to make sure the value doesn't decrease | ||
| 101 | # and is at least greater than the maximum upstream value | ||
| 102 | # and the maximum local value | ||
| 103 | |||
| 104 | local_max = self.server.table.find_package_max_value(version, pkgarch) | ||
| 105 | if revision_smaller(value, local_max): | ||
| 106 | value = increase_revision(local_max) | ||
| 107 | |||
| 108 | if revision_smaller(value, upstream_max): | ||
| 109 | # Ask upstream whether it knows the checksum | ||
| 110 | upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
| 111 | if upstream_value is None: | ||
| 112 | # Upstream doesn't have our checksum, let create a new one | ||
| 113 | value = upstream_max + ".0" | ||
| 114 | else: | ||
| 115 | # Fine to take the same value as upstream | ||
| 116 | value = upstream_max | ||
| 117 | |||
| 118 | if not value == existing_value and not self.server.read_only: | ||
| 119 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
| 120 | |||
| 121 | return {"value": value} | ||
| 122 | |||
| 123 | # The configuration is a new one for the local server | ||
| 124 | # Let's ask the upstream server whether it knows it | ||
| 125 | |||
| 126 | known_upstream = await self.upstream_client.test_package(version, pkgarch) | ||
| 127 | |||
| 128 | if not known_upstream: | ||
| 129 | |||
| 130 | # The package is not known upstream, must be a local-only package | ||
| 131 | # Let's compute the PR number using the local-only method | ||
| 132 | |||
| 133 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
| 134 | return {"value": value} | ||
| 135 | |||
| 136 | # The package is known upstream, let's ask the upstream server | ||
| 137 | # whether it knows our new output hash | ||
| 138 | |||
| 139 | value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
| 140 | |||
| 141 | if value is not None: | ||
| 142 | |||
| 143 | # Upstream knows this output hash, let's store it and use it too. | ||
| 144 | |||
| 145 | if not self.server.read_only: | ||
| 146 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
| 147 | # If the local server is read only, won't be able to store the new | ||
| 148 | # value in the database and will have to keep asking the upstream server | ||
| 149 | return {"value": value} | ||
| 150 | |||
| 151 | # The output hash doesn't exist upstream, get the most recent number from upstream (x) | ||
| 152 | # Then, we want to have a new PR value for the local server: x.y | ||
| 153 | |||
| 154 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
| 155 | # Here we know that the package is known upstream, so upstream_max can't be None | ||
| 156 | subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max) | ||
| 157 | |||
| 158 | if not self.server.read_only: | ||
| 159 | self.server.table.store_value(version, pkgarch, checksum, subvalue) | ||
| 160 | |||
| 161 | return {"value": subvalue} | ||
| 162 | |||
| 163 | async def process_requests(self): | ||
| 164 | if self.server.upstream is not None: | ||
| 165 | self.upstream_client = await create_async_client(self.server.upstream) | ||
| 166 | else: | ||
| 167 | self.upstream_client = None | ||
| 168 | |||
| 169 | try: | ||
| 170 | await super().process_requests() | ||
| 171 | finally: | ||
| 172 | if self.upstream_client is not None: | ||
| 173 | await self.upstream_client.close() | ||
| 87 | 174 | ||
| 88 | async def handle_import_one(self, request): | 175 | async def handle_import_one(self, request): | 
| 89 | response = None | 176 | response = None | 
| @@ -104,9 +191,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 104 | pkgarch = request["pkgarch"] | 191 | pkgarch = request["pkgarch"] | 
| 105 | checksum = request["checksum"] | 192 | checksum = request["checksum"] | 
| 106 | colinfo = request["colinfo"] | 193 | colinfo = request["colinfo"] | 
| 194 | history = request["history"] | ||
| 107 | 195 | ||
| 108 | try: | 196 | try: | 
| 109 | (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo) | 197 | (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history) | 
| 110 | except sqlite3.Error as exc: | 198 | except sqlite3.Error as exc: | 
| 111 | self.logger.error(str(exc)) | 199 | self.logger.error(str(exc)) | 
| 112 | metainfo = datainfo = None | 200 | metainfo = datainfo = None | 
| @@ -117,11 +205,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 117 | return {"readonly": self.server.read_only} | 205 | return {"readonly": self.server.read_only} | 
| 118 | 206 | ||
| 119 | class PRServer(bb.asyncrpc.AsyncServer): | 207 | class PRServer(bb.asyncrpc.AsyncServer): | 
| 120 | def __init__(self, dbfile, read_only=False): | 208 | def __init__(self, dbfile, read_only=False, upstream=None): | 
| 121 | super().__init__(logger) | 209 | super().__init__(logger) | 
| 122 | self.dbfile = dbfile | 210 | self.dbfile = dbfile | 
| 123 | self.table = None | 211 | self.table = None | 
| 124 | self.read_only = read_only | 212 | self.read_only = read_only | 
| 213 | self.upstream = upstream | ||
| 125 | 214 | ||
| 126 | def accept_client(self, socket): | 215 | def accept_client(self, socket): | 
| 127 | return PRServerClient(socket, self) | 216 | return PRServerClient(socket, self) | 
| @@ -134,27 +223,25 @@ class PRServer(bb.asyncrpc.AsyncServer): | |||
| 134 | self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 223 | self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 
| 135 | (self.dbfile, self.address, str(os.getpid()))) | 224 | (self.dbfile, self.address, str(os.getpid()))) | 
| 136 | 225 | ||
| 226 | if self.upstream is not None: | ||
| 227 | self.logger.info("And upstream PRServer: %s " % (self.upstream)) | ||
| 228 | |||
| 137 | return tasks | 229 | return tasks | 
| 138 | 230 | ||
| 139 | async def stop(self): | 231 | async def stop(self): | 
| 140 | self.table.sync_if_dirty() | ||
| 141 | self.db.disconnect() | 232 | self.db.disconnect() | 
| 142 | await super().stop() | 233 | await super().stop() | 
| 143 | 234 | ||
| 144 | def signal_handler(self): | ||
| 145 | super().signal_handler() | ||
| 146 | if self.table: | ||
| 147 | self.table.sync() | ||
| 148 | |||
| 149 | class PRServSingleton(object): | 235 | class PRServSingleton(object): | 
| 150 | def __init__(self, dbfile, logfile, host, port): | 236 | def __init__(self, dbfile, logfile, host, port, upstream): | 
| 151 | self.dbfile = dbfile | 237 | self.dbfile = dbfile | 
| 152 | self.logfile = logfile | 238 | self.logfile = logfile | 
| 153 | self.host = host | 239 | self.host = host | 
| 154 | self.port = port | 240 | self.port = port | 
| 241 | self.upstream = upstream | ||
| 155 | 242 | ||
| 156 | def start(self): | 243 | def start(self): | 
| 157 | self.prserv = PRServer(self.dbfile) | 244 | self.prserv = PRServer(self.dbfile, upstream=self.upstream) | 
| 158 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) | 245 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) | 
| 159 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) | 246 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) | 
| 160 | 247 | ||
| @@ -233,7 +320,7 @@ def run_as_daemon(func, pidfile, logfile): | |||
| 233 | os.remove(pidfile) | 320 | os.remove(pidfile) | 
| 234 | os._exit(0) | 321 | os._exit(0) | 
| 235 | 322 | ||
| 236 | def start_daemon(dbfile, host, port, logfile, read_only=False): | 323 | def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None): | 
| 237 | ip = socket.gethostbyname(host) | 324 | ip = socket.gethostbyname(host) | 
| 238 | pidfile = PIDPREFIX % (ip, port) | 325 | pidfile = PIDPREFIX % (ip, port) | 
| 239 | try: | 326 | try: | 
| @@ -249,7 +336,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False): | |||
| 249 | 336 | ||
| 250 | dbfile = os.path.abspath(dbfile) | 337 | dbfile = os.path.abspath(dbfile) | 
| 251 | def daemon_main(): | 338 | def daemon_main(): | 
| 252 | server = PRServer(dbfile, read_only=read_only) | 339 | server = PRServer(dbfile, read_only=read_only, upstream=upstream) | 
| 253 | server.start_tcp_server(ip, port) | 340 | server.start_tcp_server(ip, port) | 
| 254 | server.serve_forever() | 341 | server.serve_forever() | 
| 255 | 342 | ||
| @@ -336,6 +423,9 @@ def auto_start(d): | |||
| 336 | 423 | ||
| 337 | host = host_params[0].strip().lower() | 424 | host = host_params[0].strip().lower() | 
| 338 | port = int(host_params[1]) | 425 | port = int(host_params[1]) | 
| 426 | |||
| 427 | upstream = d.getVar("PRSERV_UPSTREAM") or None | ||
| 428 | |||
| 339 | if is_local_special(host, port): | 429 | if is_local_special(host, port): | 
| 340 | import bb.utils | 430 | import bb.utils | 
| 341 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) | 431 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) | 
| @@ -350,7 +440,7 @@ def auto_start(d): | |||
| 350 | auto_shutdown() | 440 | auto_shutdown() | 
| 351 | if not singleton: | 441 | if not singleton: | 
| 352 | bb.utils.mkdirhier(cachedir) | 442 | bb.utils.mkdirhier(cachedir) | 
| 353 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) | 443 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream) | 
| 354 | singleton.start() | 444 | singleton.start() | 
| 355 | if singleton: | 445 | if singleton: | 
| 356 | host = singleton.host | 446 | host = singleton.host | 
