summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2021-10-07 14:32:00 -0500
committerRichard Purdie <richard.purdie@linuxfoundation.org>2021-10-11 11:00:06 +0100
commitc7c47bb0d2c2263c4668c4269c669b282d6168d9 (patch)
tree19767cdfc5be31dbdef064439700891d091853f6 /bitbake
parentecb11a6848b4a86de3f04d7dcf12ce8ff6491a07 (diff)
downloadpoky-c7c47bb0d2c2263c4668c4269c669b282d6168d9.tar.gz
bitbake: hashserv: Fix diverging report race condition
Fixes the hashequivalence server to resolve the diverging report race error. This error occurs when the same task(hash) is run simultaneous on two different builders, and then the results are reported back but the hashes diverge (e.g. have different outhashes), and one outhash is equivalent to a hash and another is not. If taskhash was not originally in the database, the client will fallback to using the taskhash as the suggested unihash and the server will see reports come in like: taskhash: A unihash: A outhash: B taskhash: C unihash: C outhash: B taskhash: C unihash: C outhash: D Note that the second and third reports are the same taskhash, with diverging outhashes. Taskhash C should be equivalent to taskhash (and unihash) A because they share an outhash B, but the server would not do this when tasks were reported in the order shown. It became clear while trying to fix this that single large table to store all reported hashes was going to make these updates difficult since updating the unihash of all entries would be complex and time consuming. Instead, it makes more sense to split apart the database into two tables: One that maps taskhashes to unihashes and one that maps outhashes to taskhashes. This should hopefully improve the parsing query times as well since they only care about the taskhashes to unihashes table, at the cost of more complex INNER JOIN queries on the lesser used API. Note this change does delete existing hash equivlance data and starts a new database table rather than converting existing data. (Bitbake rev: dff5a17558e2476064e85f35bad1fd65fec23600) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/hashserv/__init__.py66
-rw-r--r--bitbake/lib/hashserv/client.py1
-rw-r--r--bitbake/lib/hashserv/server.py340
-rw-r--r--bitbake/lib/hashserv/tests.py60
4 files changed, 314 insertions, 153 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 5f2e101e52..9cb3fd57a5 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -22,46 +22,68 @@ ADDR_TYPE_TCP = 1
22# is necessary 22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024 23DEFAULT_MAX_CHUNK = 32 * 1024
24 24
25TABLE_DEFINITION = ( 25UNIHASH_TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL"), 26 ("method", "TEXT NOT NULL", "UNIQUE"),
27 ("outhash", "TEXT NOT NULL"), 27 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
28 ("taskhash", "TEXT NOT NULL"), 28 ("unihash", "TEXT NOT NULL", ""),
29 ("unihash", "TEXT NOT NULL"), 29)
30 ("created", "DATETIME"), 30
31UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
32
33OUTHASH_TABLE_DEFINITION = (
34 ("method", "TEXT NOT NULL", "UNIQUE"),
35 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
36 ("outhash", "TEXT NOT NULL", "UNIQUE"),
37 ("created", "DATETIME", ""),
31 38
32 # Optional fields 39 # Optional fields
33 ("owner", "TEXT"), 40 ("owner", "TEXT", ""),
34 ("PN", "TEXT"), 41 ("PN", "TEXT", ""),
35 ("PV", "TEXT"), 42 ("PV", "TEXT", ""),
36 ("PR", "TEXT"), 43 ("PR", "TEXT", ""),
37 ("task", "TEXT"), 44 ("task", "TEXT", ""),
38 ("outhash_siginfo", "TEXT"), 45 ("outhash_siginfo", "TEXT", ""),
39) 46)
40 47
41TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) 48OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
49
50def _make_table(cursor, name, definition):
51 cursor.execute('''
52 CREATE TABLE IF NOT EXISTS {name} (
53 id INTEGER PRIMARY KEY AUTOINCREMENT,
54 {fields}
55 UNIQUE({unique})
56 )
57 '''.format(
58 name=name,
59 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
60 unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags)
61 ))
62
42 63
43def setup_database(database, sync=True): 64def setup_database(database, sync=True):
44 db = sqlite3.connect(database) 65 db = sqlite3.connect(database)
45 db.row_factory = sqlite3.Row 66 db.row_factory = sqlite3.Row
46 67
47 with closing(db.cursor()) as cursor: 68 with closing(db.cursor()) as cursor:
48 cursor.execute(''' 69 _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION)
49 CREATE TABLE IF NOT EXISTS tasks_v2 ( 70 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
50 id INTEGER PRIMARY KEY AUTOINCREMENT, 71
51 %s
52 UNIQUE(method, outhash, taskhash)
53 )
54 ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
55 cursor.execute('PRAGMA journal_mode = WAL') 72 cursor.execute('PRAGMA journal_mode = WAL')
56 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) 73 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
57 74
58 # Drop old indexes 75 # Drop old indexes
59 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') 76 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
60 cursor.execute('DROP INDEX IF EXISTS outhash_lookup') 77 cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
78 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2')
79 cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2')
80
81 # TODO: Upgrade from tasks_v2?
82 cursor.execute('DROP TABLE IF EXISTS tasks_v2')
61 83
62 # Create new indexes 84 # Create new indexes
63 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') 85 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)')
64 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') 86 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)')
65 87
66 return db 88 return db
67 89
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 8cfd90d6a8..b2aa1026ac 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -111,6 +111,7 @@ class Client(bb.asyncrpc.Client):
111 "report_unihash", 111 "report_unihash",
112 "report_unihash_equiv", 112 "report_unihash_equiv",
113 "get_taskhash", 113 "get_taskhash",
114 "get_outhash",
114 "get_stats", 115 "get_stats",
115 "reset_stats", 116 "reset_stats",
116 "backfill_wait", 117 "backfill_wait",
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index a059e52115..ef8227d430 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -5,11 +5,12 @@
5 5
6from contextlib import closing, contextmanager 6from contextlib import closing, contextmanager
7from datetime import datetime 7from datetime import datetime
8import enum
8import asyncio 9import asyncio
9import logging 10import logging
10import math 11import math
11import time 12import time
12from . import create_async_client, TABLE_COLUMNS 13from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
13import bb.asyncrpc 14import bb.asyncrpc
14 15
15 16
@@ -106,56 +107,64 @@ class Stats(object):
106 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} 107 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
107 108
108 109
109def insert_task(cursor, data, ignore=False): 110@enum.unique
111class Resolve(enum.Enum):
112 FAIL = enum.auto()
113 IGNORE = enum.auto()
114 REPLACE = enum.auto()
115
116
117def insert_table(cursor, table, data, on_conflict):
118 resolve = {
119 Resolve.FAIL: "",
120 Resolve.IGNORE: " OR IGNORE",
121 Resolve.REPLACE: " OR REPLACE",
122 }[on_conflict]
123
110 keys = sorted(data.keys()) 124 keys = sorted(data.keys())
111 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( 125 query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format(
112 " OR IGNORE" if ignore else "", 126 resolve=resolve,
113 ', '.join(keys), 127 table=table,
114 ', '.join(':' + k for k in keys)) 128 fields=", ".join(keys),
129 values=", ".join(":" + k for k in keys),
130 )
131 prevrowid = cursor.lastrowid
115 cursor.execute(query, data) 132 cursor.execute(query, data)
116 133 logging.debug(
117async def copy_from_upstream(client, db, method, taskhash): 134 "Inserting %r into %s, %s",
118 d = await client.get_taskhash(method, taskhash, True) 135 data,
136 table,
137 on_conflict
138 )
139 return (cursor.lastrowid, cursor.lastrowid != prevrowid)
140
141def insert_unihash(cursor, data, on_conflict):
142 return insert_table(cursor, "unihashes_v2", data, on_conflict)
143
144def insert_outhash(cursor, data, on_conflict):
145 return insert_table(cursor, "outhashes_v2", data, on_conflict)
146
147async def copy_unihash_from_upstream(client, db, method, taskhash):
148 d = await client.get_taskhash(method, taskhash)
119 if d is not None: 149 if d is not None:
120 # Filter out unknown columns
121 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
122
123 with closing(db.cursor()) as cursor: 150 with closing(db.cursor()) as cursor:
124 insert_task(cursor, d) 151 insert_unihash(
152 cursor,
153 {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
154 Resolve.IGNORE,
155 )
125 db.commit() 156 db.commit()
126
127 return d 157 return d
128 158
129async def copy_outhash_from_upstream(client, db, method, outhash, taskhash):
130 d = await client.get_outhash(method, outhash, taskhash)
131 if d is not None:
132 # Filter out unknown columns
133 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
134 159
135 with closing(db.cursor()) as cursor: 160class ServerCursor(object):
136 insert_task(cursor, d) 161 def __init__(self, db, cursor, upstream):
137 db.commit() 162 self.db = db
163 self.cursor = cursor
164 self.upstream = upstream
138 165
139 return d
140 166
141class ServerClient(bb.asyncrpc.AsyncServerConnection): 167class ServerClient(bb.asyncrpc.AsyncServerConnection):
142 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
143 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
144 OUTHASH_QUERY = '''
145 -- Find tasks with a matching outhash (that is, tasks that
146 -- are equivalent)
147 SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash
148
149 -- If there is an exact match on the taskhash, return it.
150 -- Otherwise return the oldest matching outhash of any
151 -- taskhash
152 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
153 created ASC
154
155 -- Only return one row
156 LIMIT 1
157 '''
158
159 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
160 super().__init__(reader, writer, 'OEHASHEQUIV', logger) 169 super().__init__(reader, writer, 'OEHASHEQUIV', logger)
161 self.db = db 170 self.db = db
@@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
210 async def handle_get(self, request): 219 async def handle_get(self, request):
211 method = request['method'] 220 method = request['method']
212 taskhash = request['taskhash'] 221 taskhash = request['taskhash']
222 fetch_all = request.get('all', False)
213 223
214 if request.get('all', False): 224 with closing(self.db.cursor()) as cursor:
215 row = self.query_equivalent(method, taskhash, self.ALL_QUERY) 225 d = await self.get_unihash(cursor, method, taskhash, fetch_all)
216 else:
217 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)
218 226
219 if row is not None: 227 self.write_message(d)
220 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 228
221 d = {k: row[k] for k in row.keys()} 229 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
222 elif self.upstream_client is not None: 230 d = None
223 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) 231
232 if fetch_all:
233 cursor.execute(
234 '''
235 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
236 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
237 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
238 ORDER BY outhashes_v2.created ASC
239 LIMIT 1
240 ''',
241 {
242 'method': method,
243 'taskhash': taskhash,
244 }
245
246 )
247 row = cursor.fetchone()
248
249 if row is not None:
250 d = {k: row[k] for k in row.keys()}
251 elif self.upstream_client is not None:
252 d = await self.upstream_client.get_taskhash(method, taskhash, True)
253 self.update_unified(cursor, d)
254 self.db.commit()
224 else: 255 else:
225 d = None 256 row = self.query_equivalent(cursor, method, taskhash)
257
258 if row is not None:
259 d = {k: row[k] for k in row.keys()}
260 elif self.upstream_client is not None:
261 d = await self.upstream_client.get_taskhash(method, taskhash)
262 d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
263 insert_unihash(cursor, d, Resolve.IGNORE)
264 self.db.commit()
226 265
227 self.write_message(d) 266 return d
228 267
229 async def handle_get_outhash(self, request): 268 async def handle_get_outhash(self, request):
269 method = request['method']
270 outhash = request['outhash']
271 taskhash = request['taskhash']
272
230 with closing(self.db.cursor()) as cursor: 273 with closing(self.db.cursor()) as cursor:
231 cursor.execute(self.OUTHASH_QUERY, 274 d = await self.get_outhash(cursor, method, outhash, taskhash)
232 {k: request[k] for k in ('method', 'outhash', 'taskhash')})
233 275
234 row = cursor.fetchone() 276 self.write_message(d)
277
278 async def get_outhash(self, cursor, method, outhash, taskhash):
279 d = None
280 cursor.execute(
281 '''
282 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
283 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
284 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
285 ORDER BY outhashes_v2.created ASC
286 LIMIT 1
287 ''',
288 {
289 'method': method,
290 'outhash': outhash,
291 }
292 )
293 row = cursor.fetchone()
235 294
236 if row is not None: 295 if row is not None:
237 logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash']))
238 d = {k: row[k] for k in row.keys()} 296 d = {k: row[k] for k in row.keys()}
239 else: 297 elif self.upstream_client is not None:
240 d = None 298 d = await self.upstream_client.get_outhash(method, outhash, taskhash)
299 self.update_unified(cursor, d)
300 self.db.commit()
241 301
242 self.write_message(d) 302 return d
303
304 def update_unified(self, cursor, data):
305 if data is None:
306 return
307
308 insert_unihash(
309 cursor,
310 {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
311 Resolve.IGNORE
312 )
313 insert_outhash(
314 cursor,
315 {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
316 Resolve.IGNORE
317 )
243 318
244 async def handle_get_stream(self, request): 319 async def handle_get_stream(self, request):
245 self.write_message('ok') 320 self.write_message('ok')
@@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
267 342
268 (method, taskhash) = l.split() 343 (method, taskhash) = l.split()
269 #logger.debug('Looking up %s %s' % (method, taskhash)) 344 #logger.debug('Looking up %s %s' % (method, taskhash))
270 row = self.query_equivalent(method, taskhash, self.FAST_QUERY) 345 cursor = self.db.cursor()
346 try:
347 row = self.query_equivalent(cursor, method, taskhash)
348 finally:
349 cursor.close()
350
271 if row is not None: 351 if row is not None:
272 msg = ('%s\n' % row['unihash']).encode('utf-8') 352 msg = ('%s\n' % row['unihash']).encode('utf-8')
273 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 353 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
@@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
294 374
295 async def handle_report(self, data): 375 async def handle_report(self, data):
296 with closing(self.db.cursor()) as cursor: 376 with closing(self.db.cursor()) as cursor:
297 cursor.execute(self.OUTHASH_QUERY, 377 outhash_data = {
298 {k: data[k] for k in ('method', 'outhash', 'taskhash')}) 378 'method': data['method'],
379 'outhash': data['outhash'],
380 'taskhash': data['taskhash'],
381 'created': datetime.now()
382 }
299 383
300 row = cursor.fetchone() 384 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
385 if k in data:
386 outhash_data[k] = data[k]
387
388 # Insert the new entry, unless it already exists
389 (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
390
391 if inserted:
392 # If this row is new, check if it is equivalent to another
393 # output hash
394 cursor.execute(
395 '''
396 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
397 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
398 -- Select any matching output hash except the one we just inserted
399 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
400 -- Pick the oldest hash
401 ORDER BY outhashes_v2.created ASC
402 LIMIT 1
403 ''',
404 {
405 'method': data['method'],
406 'outhash': data['outhash'],
407 'taskhash': data['taskhash'],
408 }
409 )
410 row = cursor.fetchone()
301 411
302 if row is None and self.upstream_client:
303 # Try upstream
304 row = await copy_outhash_from_upstream(self.upstream_client,
305 self.db,
306 data['method'],
307 data['outhash'],
308 data['taskhash'])
309
310 # If no matching outhash was found, or one *was* found but it
311 # wasn't an exact match on the taskhash, a new entry for this
312 # taskhash should be added
313 if row is None or row['taskhash'] != data['taskhash']:
314 # If a row matching the outhash was found, the unihash for
315 # the new taskhash should be the same as that one.
316 # Otherwise the caller provided unihash is used.
317 unihash = data['unihash']
318 if row is not None: 412 if row is not None:
413 # A matching output hash was found. Set our taskhash to the
414 # same unihash since they are equivalent
319 unihash = row['unihash'] 415 unihash = row['unihash']
416 resolve = Resolve.REPLACE
417 else:
418 # No matching output hash was found. This is probably the
419 # first outhash to be added.
420 unihash = data['unihash']
421 resolve = Resolve.IGNORE
422
423 # Query upstream to see if it has a unihash we can use
424 if self.upstream_client is not None:
425 upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash'])
426 if upstream_data is not None:
427 unihash = upstream_data['unihash']
428
429
430 insert_unihash(
431 cursor,
432 {
433 'method': data['method'],
434 'taskhash': data['taskhash'],
435 'unihash': unihash,
436 },
437 resolve
438 )
439
440 unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash'])
441 if unihash_data is not None:
442 unihash = unihash_data['unihash']
443 else:
444 unihash = data['unihash']
320 445
321 insert_data = { 446 self.db.commit()
322 'method': data['method'],
323 'outhash': data['outhash'],
324 'taskhash': data['taskhash'],
325 'unihash': unihash,
326 'created': datetime.now()
327 }
328
329 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
330 if k in data:
331 insert_data[k] = data[k]
332
333 insert_task(cursor, insert_data)
334 self.db.commit()
335
336 logger.info('Adding taskhash %s with unihash %s',
337 data['taskhash'], unihash)
338 447
339 d = { 448 d = {
340 'taskhash': data['taskhash'], 449 'taskhash': data['taskhash'],
341 'method': data['method'], 450 'method': data['method'],
342 'unihash': unihash 451 'unihash': unihash,
343 } 452 }
344 else:
345 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
346 453
347 self.write_message(d) 454 self.write_message(d)
348 455
@@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
350 with closing(self.db.cursor()) as cursor: 457 with closing(self.db.cursor()) as cursor:
351 insert_data = { 458 insert_data = {
352 'method': data['method'], 459 'method': data['method'],
353 'outhash': "",
354 'taskhash': data['taskhash'], 460 'taskhash': data['taskhash'],
355 'unihash': data['unihash'], 461 'unihash': data['unihash'],
356 'created': datetime.now()
357 } 462 }
358 463 insert_unihash(cursor, insert_data, Resolve.IGNORE)
359 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
360 if k in data:
361 insert_data[k] = data[k]
362
363 insert_task(cursor, insert_data, ignore=True)
364 self.db.commit() 464 self.db.commit()
365 465
366 # Fetch the unihash that will be reported for the taskhash. If the 466 # Fetch the unihash that will be reported for the taskhash. If the
367 # unihash matches, it means this row was inserted (or the mapping 467 # unihash matches, it means this row was inserted (or the mapping
368 # was already valid) 468 # was already valid)
369 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY) 469 row = self.query_equivalent(cursor, data['method'], data['taskhash'])
370 470
371 if row['unihash'] == data['unihash']: 471 if row['unihash'] == data['unihash']:
372 logger.info('Adding taskhash equivalence for %s with unihash %s', 472 logger.info('Adding taskhash equivalence for %s with unihash %s',
@@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
399 await self.backfill_queue.join() 499 await self.backfill_queue.join()
400 self.write_message(d) 500 self.write_message(d)
401 501
402 def query_equivalent(self, method, taskhash, query): 502 def query_equivalent(self, cursor, method, taskhash):
403 # This is part of the inner loop and must be as fast as possible 503 # This is part of the inner loop and must be as fast as possible
404 try: 504 cursor.execute(
405 cursor = self.db.cursor() 505 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash',
406 cursor.execute(query, {'method': method, 'taskhash': taskhash}) 506 {
407 return cursor.fetchone() 507 'method': method,
408 except: 508 'taskhash': taskhash,
409 cursor.close() 509 }
510 )
511 return cursor.fetchone()
410 512
411 513
412class Server(bb.asyncrpc.AsyncServer): 514class Server(bb.asyncrpc.AsyncServer):
@@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer):
435 self.backfill_queue.task_done() 537 self.backfill_queue.task_done()
436 break 538 break
437 method, taskhash = item 539 method, taskhash = item
438 await copy_from_upstream(client, self.db, method, taskhash) 540 await copy_unihash_from_upstream(client, self.db, method, taskhash)
439 self.backfill_queue.task_done() 541 self.backfill_queue.task_done()
440 finally: 542 finally:
441 await client.close() 543 await client.close()
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 1fcfb6b929..efaf3bdf42 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -19,10 +19,10 @@ import time
19import signal 19import signal
20 20
21def server_prefunc(server, idx): 21def server_prefunc(server, idx):
22 logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w', 22 logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w',
23 format='%(levelname)s %(filename)s:%(lineno)d %(message)s') 23 format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
24 server.logger.debug("Running server %d" % idx) 24 server.logger.debug("Running server %d" % idx)
25 sys.stdout = open('bbhashserv-%d.log' % idx, 'w') 25 sys.stdout = open('bbhashserv-stdout-%d.log' % idx, 'w')
26 sys.stderr = sys.stdout 26 sys.stderr = sys.stdout
27 27
28class HashEquivalenceTestSetup(object): 28class HashEquivalenceTestSetup(object):
@@ -140,12 +140,17 @@ class HashEquivalenceCommonTests(object):
140 }) 140 })
141 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') 141 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
142 142
143 result = self.client.get_taskhash(self.METHOD, taskhash, True) 143 result_unihash = self.client.get_taskhash(self.METHOD, taskhash, True)
144 self.assertEqual(result['taskhash'], taskhash) 144 self.assertEqual(result_unihash['taskhash'], taskhash)
145 self.assertEqual(result['unihash'], unihash) 145 self.assertEqual(result_unihash['unihash'], unihash)
146 self.assertEqual(result['method'], self.METHOD) 146 self.assertEqual(result_unihash['method'], self.METHOD)
147 self.assertEqual(result['outhash'], outhash) 147
148 self.assertEqual(result['outhash_siginfo'], siginfo) 148 result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
149 self.assertEqual(result_outhash['taskhash'], taskhash)
150 self.assertEqual(result_outhash['method'], self.METHOD)
151 self.assertEqual(result_outhash['unihash'], unihash)
152 self.assertEqual(result_outhash['outhash'], outhash)
153 self.assertEqual(result_outhash['outhash_siginfo'], siginfo)
149 154
150 def test_stress(self): 155 def test_stress(self):
151 def query_server(failures): 156 def query_server(failures):
@@ -260,6 +265,39 @@ class HashEquivalenceCommonTests(object):
260 result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) 265 result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6)
261 self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') 266 self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream')
262 267
268 # Tests read through from server with
269 taskhash7 = '9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74'
270 outhash7 = '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69'
271 unihash7 = '05d2a63c81e32f0a36542ca677e8ad852365c538'
272 self.client.report_unihash(taskhash7, self.METHOD, outhash7, unihash7)
273
274 result = down_client.get_taskhash(self.METHOD, taskhash7, True)
275 self.assertEqual(result['unihash'], unihash7, 'Server failed to copy unihash from upstream')
276 self.assertEqual(result['outhash'], outhash7, 'Server failed to copy unihash from upstream')
277 self.assertEqual(result['taskhash'], taskhash7, 'Server failed to copy unihash from upstream')
278 self.assertEqual(result['method'], self.METHOD)
279
280 taskhash8 = '86978a4c8c71b9b487330b0152aade10c1ee58aa'
281 outhash8 = 'ca8c128e9d9e4a28ef24d0508aa20b5cf880604eacd8f65c0e366f7e0cc5fbcf'
282 unihash8 = 'd8bcf25369d40590ad7d08c84d538982f2023e01'
283 self.client.report_unihash(taskhash8, self.METHOD, outhash8, unihash8)
284
285 result = down_client.get_outhash(self.METHOD, outhash8, taskhash8)
286 self.assertEqual(result['unihash'], unihash8, 'Server failed to copy unihash from upstream')
287 self.assertEqual(result['outhash'], outhash8, 'Server failed to copy unihash from upstream')
288 self.assertEqual(result['taskhash'], taskhash8, 'Server failed to copy unihash from upstream')
289 self.assertEqual(result['method'], self.METHOD)
290
291 taskhash9 = 'ae6339531895ddf5b67e663e6a374ad8ec71d81c'
292 outhash9 = 'afc78172c81880ae10a1fec994b5b4ee33d196a001a1b66212a15ebe573e00b5'
293 unihash9 = '6662e699d6e3d894b24408ff9a4031ef9b038ee8'
294 self.client.report_unihash(taskhash9, self.METHOD, outhash9, unihash9)
295
296 result = down_client.get_taskhash(self.METHOD, taskhash9, False)
297 self.assertEqual(result['unihash'], unihash9, 'Server failed to copy unihash from upstream')
298 self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream')
299 self.assertEqual(result['method'], self.METHOD)
300
263 def test_ro_server(self): 301 def test_ro_server(self):
264 (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) 302 (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True)
265 303
@@ -287,10 +325,8 @@ class HashEquivalenceCommonTests(object):
287 325
288 326
289 def test_slow_server_start(self): 327 def test_slow_server_start(self):
290 """ 328 # Ensures that the server will exit correctly even if it gets a SIGTERM
291 Ensures that the server will exit correctly even if it gets a SIGTERM 329 # before entering the main loop
292 before entering the main loop
293 """
294 330
295 event = multiprocessing.Event() 331 event = multiprocessing.Event()
296 332