diff options
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
-rw-r--r-- | bitbake/lib/prserv/serv.py | 140 |
1 files changed, 115 insertions, 25 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index dc4be5b620..e175886308 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
@@ -12,6 +12,7 @@ import sqlite3 | |||
12 | import prserv | 12 | import prserv |
13 | import prserv.db | 13 | import prserv.db |
14 | import errno | 14 | import errno |
15 | from . import create_async_client, revision_smaller, increase_revision | ||
15 | import bb.asyncrpc | 16 | import bb.asyncrpc |
16 | 17 | ||
17 | logger = logging.getLogger("BitBake.PRserv") | 18 | logger = logging.getLogger("BitBake.PRserv") |
@@ -41,18 +42,16 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
41 | try: | 42 | try: |
42 | return await super().dispatch_message(msg) | 43 | return await super().dispatch_message(msg) |
43 | except: | 44 | except: |
44 | self.server.table.sync() | ||
45 | raise | 45 | raise |
46 | else: | ||
47 | self.server.table.sync_if_dirty() | ||
48 | 46 | ||
49 | async def handle_test_pr(self, request): | 47 | async def handle_test_pr(self, request): |
50 | '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' | 48 | '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' |
51 | version = request["version"] | 49 | version = request["version"] |
52 | pkgarch = request["pkgarch"] | 50 | pkgarch = request["pkgarch"] |
53 | checksum = request["checksum"] | 51 | checksum = request["checksum"] |
52 | history = request["history"] | ||
54 | 53 | ||
55 | value = self.server.table.find_value(version, pkgarch, checksum) | 54 | value = self.server.table.find_value(version, pkgarch, checksum, history) |
56 | return {"value": value} | 55 | return {"value": value} |
57 | 56 | ||
58 | async def handle_test_package(self, request): | 57 | async def handle_test_package(self, request): |
@@ -68,22 +67,110 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
68 | version = request["version"] | 67 | version = request["version"] |
69 | pkgarch = request["pkgarch"] | 68 | pkgarch = request["pkgarch"] |
70 | 69 | ||
71 | value = self.server.table.find_max_value(version, pkgarch) | 70 | value = self.server.table.find_package_max_value(version, pkgarch) |
72 | return {"value": value} | 71 | return {"value": value} |
73 | 72 | ||
74 | async def handle_get_pr(self, request): | 73 | async def handle_get_pr(self, request): |
75 | version = request["version"] | 74 | version = request["version"] |
76 | pkgarch = request["pkgarch"] | 75 | pkgarch = request["pkgarch"] |
77 | checksum = request["checksum"] | 76 | checksum = request["checksum"] |
77 | history = request["history"] | ||
78 | 78 | ||
79 | response = None | 79 | if self.upstream_client is None: |
80 | try: | 80 | value = self.server.table.get_value(version, pkgarch, checksum, history) |
81 | value = self.server.table.get_value(version, pkgarch, checksum) | 81 | return {"value": value} |
82 | response = {"value": value} | ||
83 | except prserv.NotFoundError: | ||
84 | self.logger.error("failure storing value in database for (%s, %s)",version, checksum) | ||
85 | 82 | ||
86 | return response | 83 | # We have an upstream server. |
84 | # Check whether the local server already knows the requested configuration. | ||
85 | # If the configuration is a new one, the generated value we will add will | ||
86 | # depend on what's on the upstream server. That's why we're calling find_value() | ||
87 | # instead of get_value() directly. | ||
88 | |||
89 | value = self.server.table.find_value(version, pkgarch, checksum, history) | ||
90 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
91 | |||
92 | if value is not None: | ||
93 | |||
94 | # The configuration is already known locally. | ||
95 | |||
96 | if history: | ||
97 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
98 | else: | ||
99 | existing_value = value | ||
100 | # In "no history", we need to make sure the value doesn't decrease | ||
101 | # and is at least greater than the maximum upstream value | ||
102 | # and the maximum local value | ||
103 | |||
104 | local_max = self.server.table.find_package_max_value(version, pkgarch) | ||
105 | if revision_smaller(value, local_max): | ||
106 | value = increase_revision(local_max) | ||
107 | |||
108 | if revision_smaller(value, upstream_max): | ||
109 | # Ask upstream whether it knows the checksum | ||
110 | upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
111 | if upstream_value is None: | ||
112 | # Upstream doesn't have our checksum, let create a new one | ||
113 | value = upstream_max + ".0" | ||
114 | else: | ||
115 | # Fine to take the same value as upstream | ||
116 | value = upstream_max | ||
117 | |||
118 | if not value == existing_value and not self.server.read_only: | ||
119 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
120 | |||
121 | return {"value": value} | ||
122 | |||
123 | # The configuration is a new one for the local server | ||
124 | # Let's ask the upstream server whether it knows it | ||
125 | |||
126 | known_upstream = await self.upstream_client.test_package(version, pkgarch) | ||
127 | |||
128 | if not known_upstream: | ||
129 | |||
130 | # The package is not known upstream, must be a local-only package | ||
131 | # Let's compute the PR number using the local-only method | ||
132 | |||
133 | value = self.server.table.get_value(version, pkgarch, checksum, history) | ||
134 | return {"value": value} | ||
135 | |||
136 | # The package is known upstream, let's ask the upstream server | ||
137 | # whether it knows our new output hash | ||
138 | |||
139 | value = await self.upstream_client.test_pr(version, pkgarch, checksum) | ||
140 | |||
141 | if value is not None: | ||
142 | |||
143 | # Upstream knows this output hash, let's store it and use it too. | ||
144 | |||
145 | if not self.server.read_only: | ||
146 | self.server.table.store_value(version, pkgarch, checksum, value) | ||
147 | # If the local server is read only, won't be able to store the new | ||
148 | # value in the database and will have to keep asking the upstream server | ||
149 | return {"value": value} | ||
150 | |||
151 | # The output hash doesn't exist upstream, get the most recent number from upstream (x) | ||
152 | # Then, we want to have a new PR value for the local server: x.y | ||
153 | |||
154 | upstream_max = await self.upstream_client.max_package_pr(version, pkgarch) | ||
155 | # Here we know that the package is known upstream, so upstream_max can't be None | ||
156 | subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max) | ||
157 | |||
158 | if not self.server.read_only: | ||
159 | self.server.table.store_value(version, pkgarch, checksum, subvalue) | ||
160 | |||
161 | return {"value": subvalue} | ||
162 | |||
163 | async def process_requests(self): | ||
164 | if self.server.upstream is not None: | ||
165 | self.upstream_client = await create_async_client(self.server.upstream) | ||
166 | else: | ||
167 | self.upstream_client = None | ||
168 | |||
169 | try: | ||
170 | await super().process_requests() | ||
171 | finally: | ||
172 | if self.upstream_client is not None: | ||
173 | await self.upstream_client.close() | ||
87 | 174 | ||
88 | async def handle_import_one(self, request): | 175 | async def handle_import_one(self, request): |
89 | response = None | 176 | response = None |
@@ -104,9 +191,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
104 | pkgarch = request["pkgarch"] | 191 | pkgarch = request["pkgarch"] |
105 | checksum = request["checksum"] | 192 | checksum = request["checksum"] |
106 | colinfo = request["colinfo"] | 193 | colinfo = request["colinfo"] |
194 | history = request["history"] | ||
107 | 195 | ||
108 | try: | 196 | try: |
109 | (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo) | 197 | (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history) |
110 | except sqlite3.Error as exc: | 198 | except sqlite3.Error as exc: |
111 | self.logger.error(str(exc)) | 199 | self.logger.error(str(exc)) |
112 | metainfo = datainfo = None | 200 | metainfo = datainfo = None |
@@ -117,11 +205,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
117 | return {"readonly": self.server.read_only} | 205 | return {"readonly": self.server.read_only} |
118 | 206 | ||
119 | class PRServer(bb.asyncrpc.AsyncServer): | 207 | class PRServer(bb.asyncrpc.AsyncServer): |
120 | def __init__(self, dbfile, read_only=False): | 208 | def __init__(self, dbfile, read_only=False, upstream=None): |
121 | super().__init__(logger) | 209 | super().__init__(logger) |
122 | self.dbfile = dbfile | 210 | self.dbfile = dbfile |
123 | self.table = None | 211 | self.table = None |
124 | self.read_only = read_only | 212 | self.read_only = read_only |
213 | self.upstream = upstream | ||
125 | 214 | ||
126 | def accept_client(self, socket): | 215 | def accept_client(self, socket): |
127 | return PRServerClient(socket, self) | 216 | return PRServerClient(socket, self) |
@@ -134,27 +223,25 @@ class PRServer(bb.asyncrpc.AsyncServer): | |||
134 | self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 223 | self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % |
135 | (self.dbfile, self.address, str(os.getpid()))) | 224 | (self.dbfile, self.address, str(os.getpid()))) |
136 | 225 | ||
226 | if self.upstream is not None: | ||
227 | self.logger.info("And upstream PRServer: %s " % (self.upstream)) | ||
228 | |||
137 | return tasks | 229 | return tasks |
138 | 230 | ||
139 | async def stop(self): | 231 | async def stop(self): |
140 | self.table.sync_if_dirty() | ||
141 | self.db.disconnect() | 232 | self.db.disconnect() |
142 | await super().stop() | 233 | await super().stop() |
143 | 234 | ||
144 | def signal_handler(self): | ||
145 | super().signal_handler() | ||
146 | if self.table: | ||
147 | self.table.sync() | ||
148 | |||
149 | class PRServSingleton(object): | 235 | class PRServSingleton(object): |
150 | def __init__(self, dbfile, logfile, host, port): | 236 | def __init__(self, dbfile, logfile, host, port, upstream): |
151 | self.dbfile = dbfile | 237 | self.dbfile = dbfile |
152 | self.logfile = logfile | 238 | self.logfile = logfile |
153 | self.host = host | 239 | self.host = host |
154 | self.port = port | 240 | self.port = port |
241 | self.upstream = upstream | ||
155 | 242 | ||
156 | def start(self): | 243 | def start(self): |
157 | self.prserv = PRServer(self.dbfile) | 244 | self.prserv = PRServer(self.dbfile, upstream=self.upstream) |
158 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) | 245 | self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) |
159 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) | 246 | self.process = self.prserv.serve_as_process(log_level=logging.WARNING) |
160 | 247 | ||
@@ -233,7 +320,7 @@ def run_as_daemon(func, pidfile, logfile): | |||
233 | os.remove(pidfile) | 320 | os.remove(pidfile) |
234 | os._exit(0) | 321 | os._exit(0) |
235 | 322 | ||
236 | def start_daemon(dbfile, host, port, logfile, read_only=False): | 323 | def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None): |
237 | ip = socket.gethostbyname(host) | 324 | ip = socket.gethostbyname(host) |
238 | pidfile = PIDPREFIX % (ip, port) | 325 | pidfile = PIDPREFIX % (ip, port) |
239 | try: | 326 | try: |
@@ -249,7 +336,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False): | |||
249 | 336 | ||
250 | dbfile = os.path.abspath(dbfile) | 337 | dbfile = os.path.abspath(dbfile) |
251 | def daemon_main(): | 338 | def daemon_main(): |
252 | server = PRServer(dbfile, read_only=read_only) | 339 | server = PRServer(dbfile, read_only=read_only, upstream=upstream) |
253 | server.start_tcp_server(ip, port) | 340 | server.start_tcp_server(ip, port) |
254 | server.serve_forever() | 341 | server.serve_forever() |
255 | 342 | ||
@@ -336,6 +423,9 @@ def auto_start(d): | |||
336 | 423 | ||
337 | host = host_params[0].strip().lower() | 424 | host = host_params[0].strip().lower() |
338 | port = int(host_params[1]) | 425 | port = int(host_params[1]) |
426 | |||
427 | upstream = d.getVar("PRSERV_UPSTREAM") or None | ||
428 | |||
339 | if is_local_special(host, port): | 429 | if is_local_special(host, port): |
340 | import bb.utils | 430 | import bb.utils |
341 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) | 431 | cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) |
@@ -350,7 +440,7 @@ def auto_start(d): | |||
350 | auto_shutdown() | 440 | auto_shutdown() |
351 | if not singleton: | 441 | if not singleton: |
352 | bb.utils.mkdirhier(cachedir) | 442 | bb.utils.mkdirhier(cachedir) |
353 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) | 443 | singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream) |
354 | singleton.start() | 444 | singleton.start() |
355 | if singleton: | 445 | if singleton: |
356 | host = singleton.host | 446 | host = singleton.host |