diff options
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 66 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 340 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 60 |
4 files changed, 314 insertions, 153 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 5f2e101e52..9cb3fd57a5 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -22,46 +22,68 @@ ADDR_TYPE_TCP = 1 | |||
22 | # is necessary | 22 | # is necessary |
23 | DEFAULT_MAX_CHUNK = 32 * 1024 | 23 | DEFAULT_MAX_CHUNK = 32 * 1024 |
24 | 24 | ||
25 | TABLE_DEFINITION = ( | 25 | UNIHASH_TABLE_DEFINITION = ( |
26 | ("method", "TEXT NOT NULL"), | 26 | ("method", "TEXT NOT NULL", "UNIQUE"), |
27 | ("outhash", "TEXT NOT NULL"), | 27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
28 | ("taskhash", "TEXT NOT NULL"), | 28 | ("unihash", "TEXT NOT NULL", ""), |
29 | ("unihash", "TEXT NOT NULL"), | 29 | ) |
30 | ("created", "DATETIME"), | 30 | |
31 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | ||
32 | |||
33 | OUTHASH_TABLE_DEFINITION = ( | ||
34 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
35 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
36 | ("outhash", "TEXT NOT NULL", "UNIQUE"), | ||
37 | ("created", "DATETIME", ""), | ||
31 | 38 | ||
32 | # Optional fields | 39 | # Optional fields |
33 | ("owner", "TEXT"), | 40 | ("owner", "TEXT", ""), |
34 | ("PN", "TEXT"), | 41 | ("PN", "TEXT", ""), |
35 | ("PV", "TEXT"), | 42 | ("PV", "TEXT", ""), |
36 | ("PR", "TEXT"), | 43 | ("PR", "TEXT", ""), |
37 | ("task", "TEXT"), | 44 | ("task", "TEXT", ""), |
38 | ("outhash_siginfo", "TEXT"), | 45 | ("outhash_siginfo", "TEXT", ""), |
39 | ) | 46 | ) |
40 | 47 | ||
41 | TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) | 48 | OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) |
49 | |||
50 | def _make_table(cursor, name, definition): | ||
51 | cursor.execute(''' | ||
52 | CREATE TABLE IF NOT EXISTS {name} ( | ||
53 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
54 | {fields} | ||
55 | UNIQUE({unique}) | ||
56 | ) | ||
57 | '''.format( | ||
58 | name=name, | ||
59 | fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), | ||
60 | unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags) | ||
61 | )) | ||
62 | |||
42 | 63 | ||
43 | def setup_database(database, sync=True): | 64 | def setup_database(database, sync=True): |
44 | db = sqlite3.connect(database) | 65 | db = sqlite3.connect(database) |
45 | db.row_factory = sqlite3.Row | 66 | db.row_factory = sqlite3.Row |
46 | 67 | ||
47 | with closing(db.cursor()) as cursor: | 68 | with closing(db.cursor()) as cursor: |
48 | cursor.execute(''' | 69 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) |
49 | CREATE TABLE IF NOT EXISTS tasks_v2 ( | 70 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) |
50 | id INTEGER PRIMARY KEY AUTOINCREMENT, | 71 | |
51 | %s | ||
52 | UNIQUE(method, outhash, taskhash) | ||
53 | ) | ||
54 | ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION)) | ||
55 | cursor.execute('PRAGMA journal_mode = WAL') | 72 | cursor.execute('PRAGMA journal_mode = WAL') |
56 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) | 73 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) |
57 | 74 | ||
58 | # Drop old indexes | 75 | # Drop old indexes |
59 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') | 76 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') |
60 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') | 77 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') |
78 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2') | ||
79 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2') | ||
80 | |||
81 | # TODO: Upgrade from tasks_v2? | ||
82 | cursor.execute('DROP TABLE IF EXISTS tasks_v2') | ||
61 | 83 | ||
62 | # Create new indexes | 84 | # Create new indexes |
63 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') | 85 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)') |
64 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') | 86 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)') |
65 | 87 | ||
66 | return db | 88 | return db |
67 | 89 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 8cfd90d6a8..b2aa1026ac 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -111,6 +111,7 @@ class Client(bb.asyncrpc.Client): | |||
111 | "report_unihash", | 111 | "report_unihash", |
112 | "report_unihash_equiv", | 112 | "report_unihash_equiv", |
113 | "get_taskhash", | 113 | "get_taskhash", |
114 | "get_outhash", | ||
114 | "get_stats", | 115 | "get_stats", |
115 | "reset_stats", | 116 | "reset_stats", |
116 | "backfill_wait", | 117 | "backfill_wait", |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a059e52115..ef8227d430 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -5,11 +5,12 @@ | |||
5 | 5 | ||
6 | from contextlib import closing, contextmanager | 6 | from contextlib import closing, contextmanager |
7 | from datetime import datetime | 7 | from datetime import datetime |
8 | import enum | ||
8 | import asyncio | 9 | import asyncio |
9 | import logging | 10 | import logging |
10 | import math | 11 | import math |
11 | import time | 12 | import time |
12 | from . import create_async_client, TABLE_COLUMNS | 13 | from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS |
13 | import bb.asyncrpc | 14 | import bb.asyncrpc |
14 | 15 | ||
15 | 16 | ||
@@ -106,56 +107,64 @@ class Stats(object): | |||
106 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} | 107 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} |
107 | 108 | ||
108 | 109 | ||
109 | def insert_task(cursor, data, ignore=False): | 110 | @enum.unique |
111 | class Resolve(enum.Enum): | ||
112 | FAIL = enum.auto() | ||
113 | IGNORE = enum.auto() | ||
114 | REPLACE = enum.auto() | ||
115 | |||
116 | |||
117 | def insert_table(cursor, table, data, on_conflict): | ||
118 | resolve = { | ||
119 | Resolve.FAIL: "", | ||
120 | Resolve.IGNORE: " OR IGNORE", | ||
121 | Resolve.REPLACE: " OR REPLACE", | ||
122 | }[on_conflict] | ||
123 | |||
110 | keys = sorted(data.keys()) | 124 | keys = sorted(data.keys()) |
111 | query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( | 125 | query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( |
112 | " OR IGNORE" if ignore else "", | 126 | resolve=resolve, |
113 | ', '.join(keys), | 127 | table=table, |
114 | ', '.join(':' + k for k in keys)) | 128 | fields=", ".join(keys), |
129 | values=", ".join(":" + k for k in keys), | ||
130 | ) | ||
131 | prevrowid = cursor.lastrowid | ||
115 | cursor.execute(query, data) | 132 | cursor.execute(query, data) |
116 | 133 | logging.debug( | |
117 | async def copy_from_upstream(client, db, method, taskhash): | 134 | "Inserting %r into %s, %s", |
118 | d = await client.get_taskhash(method, taskhash, True) | 135 | data, |
136 | table, | ||
137 | on_conflict | ||
138 | ) | ||
139 | return (cursor.lastrowid, cursor.lastrowid != prevrowid) | ||
140 | |||
141 | def insert_unihash(cursor, data, on_conflict): | ||
142 | return insert_table(cursor, "unihashes_v2", data, on_conflict) | ||
143 | |||
144 | def insert_outhash(cursor, data, on_conflict): | ||
145 | return insert_table(cursor, "outhashes_v2", data, on_conflict) | ||
146 | |||
147 | async def copy_unihash_from_upstream(client, db, method, taskhash): | ||
148 | d = await client.get_taskhash(method, taskhash) | ||
119 | if d is not None: | 149 | if d is not None: |
120 | # Filter out unknown columns | ||
121 | d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} | ||
122 | |||
123 | with closing(db.cursor()) as cursor: | 150 | with closing(db.cursor()) as cursor: |
124 | insert_task(cursor, d) | 151 | insert_unihash( |
152 | cursor, | ||
153 | {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
154 | Resolve.IGNORE, | ||
155 | ) | ||
125 | db.commit() | 156 | db.commit() |
126 | |||
127 | return d | 157 | return d |
128 | 158 | ||
129 | async def copy_outhash_from_upstream(client, db, method, outhash, taskhash): | ||
130 | d = await client.get_outhash(method, outhash, taskhash) | ||
131 | if d is not None: | ||
132 | # Filter out unknown columns | ||
133 | d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} | ||
134 | 159 | ||
135 | with closing(db.cursor()) as cursor: | 160 | class ServerCursor(object): |
136 | insert_task(cursor, d) | 161 | def __init__(self, db, cursor, upstream): |
137 | db.commit() | 162 | self.db = db |
163 | self.cursor = cursor | ||
164 | self.upstream = upstream | ||
138 | 165 | ||
139 | return d | ||
140 | 166 | ||
141 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
142 | FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | ||
143 | ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | ||
144 | OUTHASH_QUERY = ''' | ||
145 | -- Find tasks with a matching outhash (that is, tasks that | ||
146 | -- are equivalent) | ||
147 | SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash | ||
148 | |||
149 | -- If there is an exact match on the taskhash, return it. | ||
150 | -- Otherwise return the oldest matching outhash of any | ||
151 | -- taskhash | ||
152 | ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, | ||
153 | created ASC | ||
154 | |||
155 | -- Only return one row | ||
156 | LIMIT 1 | ||
157 | ''' | ||
158 | |||
159 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): |
160 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) |
161 | self.db = db | 170 | self.db = db |
@@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
210 | async def handle_get(self, request): | 219 | async def handle_get(self, request): |
211 | method = request['method'] | 220 | method = request['method'] |
212 | taskhash = request['taskhash'] | 221 | taskhash = request['taskhash'] |
222 | fetch_all = request.get('all', False) | ||
213 | 223 | ||
214 | if request.get('all', False): | 224 | with closing(self.db.cursor()) as cursor: |
215 | row = self.query_equivalent(method, taskhash, self.ALL_QUERY) | 225 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) |
216 | else: | ||
217 | row = self.query_equivalent(method, taskhash, self.FAST_QUERY) | ||
218 | 226 | ||
219 | if row is not None: | 227 | self.write_message(d) |
220 | logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 228 | |
221 | d = {k: row[k] for k in row.keys()} | 229 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
222 | elif self.upstream_client is not None: | 230 | d = None |
223 | d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) | 231 | |
232 | if fetch_all: | ||
233 | cursor.execute( | ||
234 | ''' | ||
235 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
236 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
237 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
238 | ORDER BY outhashes_v2.created ASC | ||
239 | LIMIT 1 | ||
240 | ''', | ||
241 | { | ||
242 | 'method': method, | ||
243 | 'taskhash': taskhash, | ||
244 | } | ||
245 | |||
246 | ) | ||
247 | row = cursor.fetchone() | ||
248 | |||
249 | if row is not None: | ||
250 | d = {k: row[k] for k in row.keys()} | ||
251 | elif self.upstream_client is not None: | ||
252 | d = await self.upstream_client.get_taskhash(method, taskhash, True) | ||
253 | self.update_unified(cursor, d) | ||
254 | self.db.commit() | ||
224 | else: | 255 | else: |
225 | d = None | 256 | row = self.query_equivalent(cursor, method, taskhash) |
257 | |||
258 | if row is not None: | ||
259 | d = {k: row[k] for k in row.keys()} | ||
260 | elif self.upstream_client is not None: | ||
261 | d = await self.upstream_client.get_taskhash(method, taskhash) | ||
262 | d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} | ||
263 | insert_unihash(cursor, d, Resolve.IGNORE) | ||
264 | self.db.commit() | ||
226 | 265 | ||
227 | self.write_message(d) | 266 | return d |
228 | 267 | ||
229 | async def handle_get_outhash(self, request): | 268 | async def handle_get_outhash(self, request): |
269 | method = request['method'] | ||
270 | outhash = request['outhash'] | ||
271 | taskhash = request['taskhash'] | ||
272 | |||
230 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
231 | cursor.execute(self.OUTHASH_QUERY, | 274 | d = await self.get_outhash(cursor, method, outhash, taskhash) |
232 | {k: request[k] for k in ('method', 'outhash', 'taskhash')}) | ||
233 | 275 | ||
234 | row = cursor.fetchone() | 276 | self.write_message(d) |
277 | |||
278 | async def get_outhash(self, cursor, method, outhash, taskhash): | ||
279 | d = None | ||
280 | cursor.execute( | ||
281 | ''' | ||
282 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
283 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
284 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
285 | ORDER BY outhashes_v2.created ASC | ||
286 | LIMIT 1 | ||
287 | ''', | ||
288 | { | ||
289 | 'method': method, | ||
290 | 'outhash': outhash, | ||
291 | } | ||
292 | ) | ||
293 | row = cursor.fetchone() | ||
235 | 294 | ||
236 | if row is not None: | 295 | if row is not None: |
237 | logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash'])) | ||
238 | d = {k: row[k] for k in row.keys()} | 296 | d = {k: row[k] for k in row.keys()} |
239 | else: | 297 | elif self.upstream_client is not None: |
240 | d = None | 298 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) |
299 | self.update_unified(cursor, d) | ||
300 | self.db.commit() | ||
241 | 301 | ||
242 | self.write_message(d) | 302 | return d |
303 | |||
304 | def update_unified(self, cursor, data): | ||
305 | if data is None: | ||
306 | return | ||
307 | |||
308 | insert_unihash( | ||
309 | cursor, | ||
310 | {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
311 | Resolve.IGNORE | ||
312 | ) | ||
313 | insert_outhash( | ||
314 | cursor, | ||
315 | {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, | ||
316 | Resolve.IGNORE | ||
317 | ) | ||
243 | 318 | ||
244 | async def handle_get_stream(self, request): | 319 | async def handle_get_stream(self, request): |
245 | self.write_message('ok') | 320 | self.write_message('ok') |
@@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
267 | 342 | ||
268 | (method, taskhash) = l.split() | 343 | (method, taskhash) = l.split() |
269 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 344 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
270 | row = self.query_equivalent(method, taskhash, self.FAST_QUERY) | 345 | cursor = self.db.cursor() |
346 | try: | ||
347 | row = self.query_equivalent(cursor, method, taskhash) | ||
348 | finally: | ||
349 | cursor.close() | ||
350 | |||
271 | if row is not None: | 351 | if row is not None: |
272 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 352 | msg = ('%s\n' % row['unihash']).encode('utf-8') |
273 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 353 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
@@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
294 | 374 | ||
295 | async def handle_report(self, data): | 375 | async def handle_report(self, data): |
296 | with closing(self.db.cursor()) as cursor: | 376 | with closing(self.db.cursor()) as cursor: |
297 | cursor.execute(self.OUTHASH_QUERY, | 377 | outhash_data = { |
298 | {k: data[k] for k in ('method', 'outhash', 'taskhash')}) | 378 | 'method': data['method'], |
379 | 'outhash': data['outhash'], | ||
380 | 'taskhash': data['taskhash'], | ||
381 | 'created': datetime.now() | ||
382 | } | ||
299 | 383 | ||
300 | row = cursor.fetchone() | 384 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): |
385 | if k in data: | ||
386 | outhash_data[k] = data[k] | ||
387 | |||
388 | # Insert the new entry, unless it already exists | ||
389 | (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) | ||
390 | |||
391 | if inserted: | ||
392 | # If this row is new, check if it is equivalent to another | ||
393 | # output hash | ||
394 | cursor.execute( | ||
395 | ''' | ||
396 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
397 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
398 | -- Select any matching output hash except the one we just inserted | ||
399 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
400 | -- Pick the oldest hash | ||
401 | ORDER BY outhashes_v2.created ASC | ||
402 | LIMIT 1 | ||
403 | ''', | ||
404 | { | ||
405 | 'method': data['method'], | ||
406 | 'outhash': data['outhash'], | ||
407 | 'taskhash': data['taskhash'], | ||
408 | } | ||
409 | ) | ||
410 | row = cursor.fetchone() | ||
301 | 411 | ||
302 | if row is None and self.upstream_client: | ||
303 | # Try upstream | ||
304 | row = await copy_outhash_from_upstream(self.upstream_client, | ||
305 | self.db, | ||
306 | data['method'], | ||
307 | data['outhash'], | ||
308 | data['taskhash']) | ||
309 | |||
310 | # If no matching outhash was found, or one *was* found but it | ||
311 | # wasn't an exact match on the taskhash, a new entry for this | ||
312 | # taskhash should be added | ||
313 | if row is None or row['taskhash'] != data['taskhash']: | ||
314 | # If a row matching the outhash was found, the unihash for | ||
315 | # the new taskhash should be the same as that one. | ||
316 | # Otherwise the caller provided unihash is used. | ||
317 | unihash = data['unihash'] | ||
318 | if row is not None: | 412 | if row is not None: |
413 | # A matching output hash was found. Set our taskhash to the | ||
414 | # same unihash since they are equivalent | ||
319 | unihash = row['unihash'] | 415 | unihash = row['unihash'] |
416 | resolve = Resolve.REPLACE | ||
417 | else: | ||
418 | # No matching output hash was found. This is probably the | ||
419 | # first outhash to be added. | ||
420 | unihash = data['unihash'] | ||
421 | resolve = Resolve.IGNORE | ||
422 | |||
423 | # Query upstream to see if it has a unihash we can use | ||
424 | if self.upstream_client is not None: | ||
425 | upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) | ||
426 | if upstream_data is not None: | ||
427 | unihash = upstream_data['unihash'] | ||
428 | |||
429 | |||
430 | insert_unihash( | ||
431 | cursor, | ||
432 | { | ||
433 | 'method': data['method'], | ||
434 | 'taskhash': data['taskhash'], | ||
435 | 'unihash': unihash, | ||
436 | }, | ||
437 | resolve | ||
438 | ) | ||
439 | |||
440 | unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) | ||
441 | if unihash_data is not None: | ||
442 | unihash = unihash_data['unihash'] | ||
443 | else: | ||
444 | unihash = data['unihash'] | ||
320 | 445 | ||
321 | insert_data = { | 446 | self.db.commit() |
322 | 'method': data['method'], | ||
323 | 'outhash': data['outhash'], | ||
324 | 'taskhash': data['taskhash'], | ||
325 | 'unihash': unihash, | ||
326 | 'created': datetime.now() | ||
327 | } | ||
328 | |||
329 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | ||
330 | if k in data: | ||
331 | insert_data[k] = data[k] | ||
332 | |||
333 | insert_task(cursor, insert_data) | ||
334 | self.db.commit() | ||
335 | |||
336 | logger.info('Adding taskhash %s with unihash %s', | ||
337 | data['taskhash'], unihash) | ||
338 | 447 | ||
339 | d = { | 448 | d = { |
340 | 'taskhash': data['taskhash'], | 449 | 'taskhash': data['taskhash'], |
341 | 'method': data['method'], | 450 | 'method': data['method'], |
342 | 'unihash': unihash | 451 | 'unihash': unihash, |
343 | } | 452 | } |
344 | else: | ||
345 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | ||
346 | 453 | ||
347 | self.write_message(d) | 454 | self.write_message(d) |
348 | 455 | ||
@@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
350 | with closing(self.db.cursor()) as cursor: | 457 | with closing(self.db.cursor()) as cursor: |
351 | insert_data = { | 458 | insert_data = { |
352 | 'method': data['method'], | 459 | 'method': data['method'], |
353 | 'outhash': "", | ||
354 | 'taskhash': data['taskhash'], | 460 | 'taskhash': data['taskhash'], |
355 | 'unihash': data['unihash'], | 461 | 'unihash': data['unihash'], |
356 | 'created': datetime.now() | ||
357 | } | 462 | } |
358 | 463 | insert_unihash(cursor, insert_data, Resolve.IGNORE) | |
359 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | ||
360 | if k in data: | ||
361 | insert_data[k] = data[k] | ||
362 | |||
363 | insert_task(cursor, insert_data, ignore=True) | ||
364 | self.db.commit() | 464 | self.db.commit() |
365 | 465 | ||
366 | # Fetch the unihash that will be reported for the taskhash. If the | 466 | # Fetch the unihash that will be reported for the taskhash. If the |
367 | # unihash matches, it means this row was inserted (or the mapping | 467 | # unihash matches, it means this row was inserted (or the mapping |
368 | # was already valid) | 468 | # was already valid) |
369 | row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY) | 469 | row = self.query_equivalent(cursor, data['method'], data['taskhash']) |
370 | 470 | ||
371 | if row['unihash'] == data['unihash']: | 471 | if row['unihash'] == data['unihash']: |
372 | logger.info('Adding taskhash equivalence for %s with unihash %s', | 472 | logger.info('Adding taskhash equivalence for %s with unihash %s', |
@@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
399 | await self.backfill_queue.join() | 499 | await self.backfill_queue.join() |
400 | self.write_message(d) | 500 | self.write_message(d) |
401 | 501 | ||
402 | def query_equivalent(self, method, taskhash, query): | 502 | def query_equivalent(self, cursor, method, taskhash): |
403 | # This is part of the inner loop and must be as fast as possible | 503 | # This is part of the inner loop and must be as fast as possible |
404 | try: | 504 | cursor.execute( |
405 | cursor = self.db.cursor() | 505 | 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', |
406 | cursor.execute(query, {'method': method, 'taskhash': taskhash}) | 506 | { |
407 | return cursor.fetchone() | 507 | 'method': method, |
408 | except: | 508 | 'taskhash': taskhash, |
409 | cursor.close() | 509 | } |
510 | ) | ||
511 | return cursor.fetchone() | ||
410 | 512 | ||
411 | 513 | ||
412 | class Server(bb.asyncrpc.AsyncServer): | 514 | class Server(bb.asyncrpc.AsyncServer): |
@@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
435 | self.backfill_queue.task_done() | 537 | self.backfill_queue.task_done() |
436 | break | 538 | break |
437 | method, taskhash = item | 539 | method, taskhash = item |
438 | await copy_from_upstream(client, self.db, method, taskhash) | 540 | await copy_unihash_from_upstream(client, self.db, method, taskhash) |
439 | self.backfill_queue.task_done() | 541 | self.backfill_queue.task_done() |
440 | finally: | 542 | finally: |
441 | await client.close() | 543 | await client.close() |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 1fcfb6b929..efaf3bdf42 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -19,10 +19,10 @@ import time | |||
19 | import signal | 19 | import signal |
20 | 20 | ||
21 | def server_prefunc(server, idx): | 21 | def server_prefunc(server, idx): |
22 | logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w', | 22 | logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w', |
23 | format='%(levelname)s %(filename)s:%(lineno)d %(message)s') | 23 | format='%(levelname)s %(filename)s:%(lineno)d %(message)s') |
24 | server.logger.debug("Running server %d" % idx) | 24 | server.logger.debug("Running server %d" % idx) |
25 | sys.stdout = open('bbhashserv-%d.log' % idx, 'w') | 25 | sys.stdout = open('bbhashserv-stdout-%d.log' % idx, 'w') |
26 | sys.stderr = sys.stdout | 26 | sys.stderr = sys.stdout |
27 | 27 | ||
28 | class HashEquivalenceTestSetup(object): | 28 | class HashEquivalenceTestSetup(object): |
@@ -140,12 +140,17 @@ class HashEquivalenceCommonTests(object): | |||
140 | }) | 140 | }) |
141 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | 141 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') |
142 | 142 | ||
143 | result = self.client.get_taskhash(self.METHOD, taskhash, True) | 143 | result_unihash = self.client.get_taskhash(self.METHOD, taskhash, True) |
144 | self.assertEqual(result['taskhash'], taskhash) | 144 | self.assertEqual(result_unihash['taskhash'], taskhash) |
145 | self.assertEqual(result['unihash'], unihash) | 145 | self.assertEqual(result_unihash['unihash'], unihash) |
146 | self.assertEqual(result['method'], self.METHOD) | 146 | self.assertEqual(result_unihash['method'], self.METHOD) |
147 | self.assertEqual(result['outhash'], outhash) | 147 | |
148 | self.assertEqual(result['outhash_siginfo'], siginfo) | 148 | result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) |
149 | self.assertEqual(result_outhash['taskhash'], taskhash) | ||
150 | self.assertEqual(result_outhash['method'], self.METHOD) | ||
151 | self.assertEqual(result_outhash['unihash'], unihash) | ||
152 | self.assertEqual(result_outhash['outhash'], outhash) | ||
153 | self.assertEqual(result_outhash['outhash_siginfo'], siginfo) | ||
149 | 154 | ||
150 | def test_stress(self): | 155 | def test_stress(self): |
151 | def query_server(failures): | 156 | def query_server(failures): |
@@ -260,6 +265,39 @@ class HashEquivalenceCommonTests(object): | |||
260 | result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) | 265 | result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) |
261 | self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') | 266 | self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') |
262 | 267 | ||
268 | # Tests read through from server with | ||
269 | taskhash7 = '9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74' | ||
270 | outhash7 = '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69' | ||
271 | unihash7 = '05d2a63c81e32f0a36542ca677e8ad852365c538' | ||
272 | self.client.report_unihash(taskhash7, self.METHOD, outhash7, unihash7) | ||
273 | |||
274 | result = down_client.get_taskhash(self.METHOD, taskhash7, True) | ||
275 | self.assertEqual(result['unihash'], unihash7, 'Server failed to copy unihash from upstream') | ||
276 | self.assertEqual(result['outhash'], outhash7, 'Server failed to copy unihash from upstream') | ||
277 | self.assertEqual(result['taskhash'], taskhash7, 'Server failed to copy unihash from upstream') | ||
278 | self.assertEqual(result['method'], self.METHOD) | ||
279 | |||
280 | taskhash8 = '86978a4c8c71b9b487330b0152aade10c1ee58aa' | ||
281 | outhash8 = 'ca8c128e9d9e4a28ef24d0508aa20b5cf880604eacd8f65c0e366f7e0cc5fbcf' | ||
282 | unihash8 = 'd8bcf25369d40590ad7d08c84d538982f2023e01' | ||
283 | self.client.report_unihash(taskhash8, self.METHOD, outhash8, unihash8) | ||
284 | |||
285 | result = down_client.get_outhash(self.METHOD, outhash8, taskhash8) | ||
286 | self.assertEqual(result['unihash'], unihash8, 'Server failed to copy unihash from upstream') | ||
287 | self.assertEqual(result['outhash'], outhash8, 'Server failed to copy unihash from upstream') | ||
288 | self.assertEqual(result['taskhash'], taskhash8, 'Server failed to copy unihash from upstream') | ||
289 | self.assertEqual(result['method'], self.METHOD) | ||
290 | |||
291 | taskhash9 = 'ae6339531895ddf5b67e663e6a374ad8ec71d81c' | ||
292 | outhash9 = 'afc78172c81880ae10a1fec994b5b4ee33d196a001a1b66212a15ebe573e00b5' | ||
293 | unihash9 = '6662e699d6e3d894b24408ff9a4031ef9b038ee8' | ||
294 | self.client.report_unihash(taskhash9, self.METHOD, outhash9, unihash9) | ||
295 | |||
296 | result = down_client.get_taskhash(self.METHOD, taskhash9, False) | ||
297 | self.assertEqual(result['unihash'], unihash9, 'Server failed to copy unihash from upstream') | ||
298 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') | ||
299 | self.assertEqual(result['method'], self.METHOD) | ||
300 | |||
263 | def test_ro_server(self): | 301 | def test_ro_server(self): |
264 | (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) | 302 | (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) |
265 | 303 | ||
@@ -287,10 +325,8 @@ class HashEquivalenceCommonTests(object): | |||
287 | 325 | ||
288 | 326 | ||
289 | def test_slow_server_start(self): | 327 | def test_slow_server_start(self): |
290 | """ | 328 | # Ensures that the server will exit correctly even if it gets a SIGTERM |
291 | Ensures that the server will exit correctly even if it gets a SIGTERM | 329 | # before entering the main loop |
292 | before entering the main loop | ||
293 | """ | ||
294 | 330 | ||
295 | event = multiprocessing.Event() | 331 | event = multiprocessing.Event() |
296 | 332 | ||