diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2021-10-07 14:32:00 -0500 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2021-10-11 11:00:06 +0100 |
commit | c7c47bb0d2c2263c4668c4269c669b282d6168d9 (patch) | |
tree | 19767cdfc5be31dbdef064439700891d091853f6 /bitbake | |
parent | ecb11a6848b4a86de3f04d7dcf12ce8ff6491a07 (diff) | |
download | poky-c7c47bb0d2c2263c4668c4269c669b282d6168d9.tar.gz |
bitbake: hashserv: Fix diverging report race condition
Fixes the hashequivalence server to resolve the diverging report race
error. This error occurs when the same task(hash) is run simultaneous on
two different builders, and then the results are reported back but the
hashes diverge (e.g. have different outhashes), and one outhash is
equivalent to a hash and another is not. If taskhash was not originally
in the database, the client will fallback to using the taskhash as the
suggested unihash and the server will see reports come in like:
taskhash: A
unihash: A
outhash: B
taskhash: C
unihash: C
outhash: B
taskhash: C
unihash: C
outhash: D
Note that the second and third reports are the same taskhash, with
diverging outhashes.
Taskhash C should be equivalent to taskhash (and unihash) A because they
share an outhash B, but the server would not do this when tasks were
reported in the order shown.
It became clear while trying to fix this that single large table to
store all reported hashes was going to make these updates difficult
since updating the unihash of all entries would be complex and time
consuming. Instead, it makes more sense to split apart the database into
two tables: One that maps taskhashes to unihashes and one that maps
outhashes to taskhashes. This should hopefully improve the parsing query
times as well since they only care about the taskhashes to unihashes
table, at the cost of more complex INNER JOIN queries on the lesser used
API.
Note this change does delete existing hash equivlance data and starts a
new database table rather than converting existing data.
(Bitbake rev: dff5a17558e2476064e85f35bad1fd65fec23600)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 66 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 340 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 60 |
4 files changed, 314 insertions, 153 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 5f2e101e52..9cb3fd57a5 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -22,46 +22,68 @@ ADDR_TYPE_TCP = 1 | |||
22 | # is necessary | 22 | # is necessary |
23 | DEFAULT_MAX_CHUNK = 32 * 1024 | 23 | DEFAULT_MAX_CHUNK = 32 * 1024 |
24 | 24 | ||
25 | TABLE_DEFINITION = ( | 25 | UNIHASH_TABLE_DEFINITION = ( |
26 | ("method", "TEXT NOT NULL"), | 26 | ("method", "TEXT NOT NULL", "UNIQUE"), |
27 | ("outhash", "TEXT NOT NULL"), | 27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
28 | ("taskhash", "TEXT NOT NULL"), | 28 | ("unihash", "TEXT NOT NULL", ""), |
29 | ("unihash", "TEXT NOT NULL"), | 29 | ) |
30 | ("created", "DATETIME"), | 30 | |
31 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | ||
32 | |||
33 | OUTHASH_TABLE_DEFINITION = ( | ||
34 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
35 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
36 | ("outhash", "TEXT NOT NULL", "UNIQUE"), | ||
37 | ("created", "DATETIME", ""), | ||
31 | 38 | ||
32 | # Optional fields | 39 | # Optional fields |
33 | ("owner", "TEXT"), | 40 | ("owner", "TEXT", ""), |
34 | ("PN", "TEXT"), | 41 | ("PN", "TEXT", ""), |
35 | ("PV", "TEXT"), | 42 | ("PV", "TEXT", ""), |
36 | ("PR", "TEXT"), | 43 | ("PR", "TEXT", ""), |
37 | ("task", "TEXT"), | 44 | ("task", "TEXT", ""), |
38 | ("outhash_siginfo", "TEXT"), | 45 | ("outhash_siginfo", "TEXT", ""), |
39 | ) | 46 | ) |
40 | 47 | ||
41 | TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) | 48 | OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) |
49 | |||
50 | def _make_table(cursor, name, definition): | ||
51 | cursor.execute(''' | ||
52 | CREATE TABLE IF NOT EXISTS {name} ( | ||
53 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
54 | {fields} | ||
55 | UNIQUE({unique}) | ||
56 | ) | ||
57 | '''.format( | ||
58 | name=name, | ||
59 | fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), | ||
60 | unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags) | ||
61 | )) | ||
62 | |||
42 | 63 | ||
43 | def setup_database(database, sync=True): | 64 | def setup_database(database, sync=True): |
44 | db = sqlite3.connect(database) | 65 | db = sqlite3.connect(database) |
45 | db.row_factory = sqlite3.Row | 66 | db.row_factory = sqlite3.Row |
46 | 67 | ||
47 | with closing(db.cursor()) as cursor: | 68 | with closing(db.cursor()) as cursor: |
48 | cursor.execute(''' | 69 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) |
49 | CREATE TABLE IF NOT EXISTS tasks_v2 ( | 70 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) |
50 | id INTEGER PRIMARY KEY AUTOINCREMENT, | 71 | |
51 | %s | ||
52 | UNIQUE(method, outhash, taskhash) | ||
53 | ) | ||
54 | ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION)) | ||
55 | cursor.execute('PRAGMA journal_mode = WAL') | 72 | cursor.execute('PRAGMA journal_mode = WAL') |
56 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) | 73 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) |
57 | 74 | ||
58 | # Drop old indexes | 75 | # Drop old indexes |
59 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') | 76 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') |
60 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') | 77 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') |
78 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2') | ||
79 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2') | ||
80 | |||
81 | # TODO: Upgrade from tasks_v2? | ||
82 | cursor.execute('DROP TABLE IF EXISTS tasks_v2') | ||
61 | 83 | ||
62 | # Create new indexes | 84 | # Create new indexes |
63 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') | 85 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)') |
64 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') | 86 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)') |
65 | 87 | ||
66 | return db | 88 | return db |
67 | 89 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 8cfd90d6a8..b2aa1026ac 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -111,6 +111,7 @@ class Client(bb.asyncrpc.Client): | |||
111 | "report_unihash", | 111 | "report_unihash", |
112 | "report_unihash_equiv", | 112 | "report_unihash_equiv", |
113 | "get_taskhash", | 113 | "get_taskhash", |
114 | "get_outhash", | ||
114 | "get_stats", | 115 | "get_stats", |
115 | "reset_stats", | 116 | "reset_stats", |
116 | "backfill_wait", | 117 | "backfill_wait", |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a059e52115..ef8227d430 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -5,11 +5,12 @@ | |||
5 | 5 | ||
6 | from contextlib import closing, contextmanager | 6 | from contextlib import closing, contextmanager |
7 | from datetime import datetime | 7 | from datetime import datetime |
8 | import enum | ||
8 | import asyncio | 9 | import asyncio |
9 | import logging | 10 | import logging |
10 | import math | 11 | import math |
11 | import time | 12 | import time |
12 | from . import create_async_client, TABLE_COLUMNS | 13 | from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS |
13 | import bb.asyncrpc | 14 | import bb.asyncrpc |
14 | 15 | ||
15 | 16 | ||
@@ -106,56 +107,64 @@ class Stats(object): | |||
106 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} | 107 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} |
107 | 108 | ||
108 | 109 | ||
109 | def insert_task(cursor, data, ignore=False): | 110 | @enum.unique |
111 | class Resolve(enum.Enum): | ||
112 | FAIL = enum.auto() | ||
113 | IGNORE = enum.auto() | ||
114 | REPLACE = enum.auto() | ||
115 | |||
116 | |||
117 | def insert_table(cursor, table, data, on_conflict): | ||
118 | resolve = { | ||
119 | Resolve.FAIL: "", | ||
120 | Resolve.IGNORE: " OR IGNORE", | ||
121 | Resolve.REPLACE: " OR REPLACE", | ||
122 | }[on_conflict] | ||
123 | |||
110 | keys = sorted(data.keys()) | 124 | keys = sorted(data.keys()) |
111 | query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( | 125 | query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( |
112 | " OR IGNORE" if ignore else "", | 126 | resolve=resolve, |
113 | ', '.join(keys), | 127 | table=table, |
114 | ', '.join(':' + k for k in keys)) | 128 | fields=", ".join(keys), |
129 | values=", ".join(":" + k for k in keys), | ||
130 | ) | ||
131 | prevrowid = cursor.lastrowid | ||
115 | cursor.execute(query, data) | 132 | cursor.execute(query, data) |
116 | 133 | logging.debug( | |
117 | async def copy_from_upstream(client, db, method, taskhash): | 134 | "Inserting %r into %s, %s", |
118 | d = await client.get_taskhash(method, taskhash, True) | 135 | data, |
136 | table, | ||
137 | on_conflict | ||
138 | ) | ||
139 | return (cursor.lastrowid, cursor.lastrowid != prevrowid) | ||
140 | |||
141 | def insert_unihash(cursor, data, on_conflict): | ||
142 | return insert_table(cursor, "unihashes_v2", data, on_conflict) | ||
143 | |||
144 | def insert_outhash(cursor, data, on_conflict): | ||
145 | return insert_table(cursor, "outhashes_v2", data, on_conflict) | ||
146 | |||
147 | async def copy_unihash_from_upstream(client, db, method, taskhash): | ||
148 | d = await client.get_taskhash(method, taskhash) | ||
119 | if d is not None: | 149 | if d is not None: |
120 | # Filter out unknown columns | ||
121 | d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} | ||
122 | |||
123 | with closing(db.cursor()) as cursor: | 150 | with closing(db.cursor()) as cursor: |
124 | insert_task(cursor, d) | 151 | insert_unihash( |
152 | cursor, | ||
153 | {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
154 | Resolve.IGNORE, | ||
155 | ) | ||
125 | db.commit() | 156 | db.commit() |
126 | |||
127 | return d | 157 | return d |
128 | 158 | ||
129 | async def copy_outhash_from_upstream(client, db, method, outhash, taskhash): | ||
130 | d = await client.get_outhash(method, outhash, taskhash) | ||
131 | if d is not None: | ||
132 | # Filter out unknown columns | ||
133 | d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} | ||
134 | 159 | ||
135 | with closing(db.cursor()) as cursor: | 160 | class ServerCursor(object): |
136 | insert_task(cursor, d) | 161 | def __init__(self, db, cursor, upstream): |
137 | db.commit() | 162 | self.db = db |
163 | self.cursor = cursor | ||
164 | self.upstream = upstream | ||
138 | 165 | ||
139 | return d | ||
140 | 166 | ||
141 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
142 | FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | ||
143 | ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | ||
144 | OUTHASH_QUERY = ''' | ||
145 | -- Find tasks with a matching outhash (that is, tasks that | ||
146 | -- are equivalent) | ||
147 | SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash | ||
148 | |||
149 | -- If there is an exact match on the taskhash, return it. | ||
150 | -- Otherwise return the oldest matching outhash of any | ||
151 | -- taskhash | ||
152 | ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, | ||
153 | created ASC | ||
154 | |||
155 | -- Only return one row | ||
156 | LIMIT 1 | ||
157 | ''' | ||
158 | |||
159 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): |
160 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) |
161 | self.db = db | 170 | self.db = db |
@@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
210 | async def handle_get(self, request): | 219 | async def handle_get(self, request): |
211 | method = request['method'] | 220 | method = request['method'] |
212 | taskhash = request['taskhash'] | 221 | taskhash = request['taskhash'] |
222 | fetch_all = request.get('all', False) | ||
213 | 223 | ||
214 | if request.get('all', False): | 224 | with closing(self.db.cursor()) as cursor: |
215 | row = self.query_equivalent(method, taskhash, self.ALL_QUERY) | 225 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) |
216 | else: | ||
217 | row = self.query_equivalent(method, taskhash, self.FAST_QUERY) | ||
218 | 226 | ||
219 | if row is not None: | 227 | self.write_message(d) |
220 | logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 228 | |
221 | d = {k: row[k] for k in row.keys()} | 229 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
222 | elif self.upstream_client is not None: | 230 | d = None |
223 | d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) | 231 | |
232 | if fetch_all: | ||
233 | cursor.execute( | ||
234 | ''' | ||
235 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
236 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
237 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
238 | ORDER BY outhashes_v2.created ASC | ||
239 | LIMIT 1 | ||
240 | ''', | ||
241 | { | ||
242 | 'method': method, | ||
243 | 'taskhash': taskhash, | ||
244 | } | ||
245 | |||
246 | ) | ||
247 | row = cursor.fetchone() | ||
248 | |||
249 | if row is not None: | ||
250 | d = {k: row[k] for k in row.keys()} | ||
251 | elif self.upstream_client is not None: | ||
252 | d = await self.upstream_client.get_taskhash(method, taskhash, True) | ||
253 | self.update_unified(cursor, d) | ||
254 | self.db.commit() | ||
224 | else: | 255 | else: |
225 | d = None | 256 | row = self.query_equivalent(cursor, method, taskhash) |
257 | |||
258 | if row is not None: | ||
259 | d = {k: row[k] for k in row.keys()} | ||
260 | elif self.upstream_client is not None: | ||
261 | d = await self.upstream_client.get_taskhash(method, taskhash) | ||
262 | d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} | ||
263 | insert_unihash(cursor, d, Resolve.IGNORE) | ||
264 | self.db.commit() | ||
226 | 265 | ||
227 | self.write_message(d) | 266 | return d |
228 | 267 | ||
229 | async def handle_get_outhash(self, request): | 268 | async def handle_get_outhash(self, request): |
269 | method = request['method'] | ||
270 | outhash = request['outhash'] | ||
271 | taskhash = request['taskhash'] | ||
272 | |||
230 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
231 | cursor.execute(self.OUTHASH_QUERY, | 274 | d = await self.get_outhash(cursor, method, outhash, taskhash) |
232 | {k: request[k] for k in ('method', 'outhash', 'taskhash')}) | ||
233 | 275 | ||
234 | row = cursor.fetchone() | 276 | self.write_message(d) |
277 | |||
278 | async def get_outhash(self, cursor, method, outhash, taskhash): | ||
279 | d = None | ||
280 | cursor.execute( | ||
281 | ''' | ||
282 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
283 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
284 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
285 | ORDER BY outhashes_v2.created ASC | ||
286 | LIMIT 1 | ||
287 | ''', | ||
288 | { | ||
289 | 'method': method, | ||
290 | 'outhash': outhash, | ||
291 | } | ||
292 | ) | ||
293 | row = cursor.fetchone() | ||
235 | 294 | ||
236 | if row is not None: | 295 | if row is not None: |
237 | logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash'])) | ||
238 | d = {k: row[k] for k in row.keys()} | 296 | d = {k: row[k] for k in row.keys()} |
239 | else: | 297 | elif self.upstream_client is not None: |
240 | d = None | 298 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) |
299 | self.update_unified(cursor, d) | ||
300 | self.db.commit() | ||
241 | 301 | ||
242 | self.write_message(d) | 302 | return d |
303 | |||
304 | def update_unified(self, cursor, data): | ||
305 | if data is None: | ||
306 | return | ||
307 | |||
308 | insert_unihash( | ||
309 | cursor, | ||
310 | {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
311 | Resolve.IGNORE | ||
312 | ) | ||
313 | insert_outhash( | ||
314 | cursor, | ||
315 | {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, | ||
316 | Resolve.IGNORE | ||
317 | ) | ||
243 | 318 | ||
244 | async def handle_get_stream(self, request): | 319 | async def handle_get_stream(self, request): |
245 | self.write_message('ok') | 320 | self.write_message('ok') |
@@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
267 | 342 | ||
268 | (method, taskhash) = l.split() | 343 | (method, taskhash) = l.split() |
269 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 344 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
270 | row = self.query_equivalent(method, taskhash, self.FAST_QUERY) | 345 | cursor = self.db.cursor() |
346 | try: | ||
347 | row = self.query_equivalent(cursor, method, taskhash) | ||
348 | finally: | ||
349 | cursor.close() | ||
350 | |||
271 | if row is not None: | 351 | if row is not None: |
272 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 352 | msg = ('%s\n' % row['unihash']).encode('utf-8') |
273 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 353 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
@@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
294 | 374 | ||
295 | async def handle_report(self, data): | 375 | async def handle_report(self, data): |
296 | with closing(self.db.cursor()) as cursor: | 376 | with closing(self.db.cursor()) as cursor: |
297 | cursor.execute(self.OUTHASH_QUERY, | 377 | outhash_data = { |
298 | {k: data[k] for k in ('method', 'outhash', 'taskhash')}) | 378 | 'method': data['method'], |
379 | 'outhash': data['outhash'], | ||
380 | 'taskhash': data['taskhash'], | ||
381 | 'created': datetime.now() | ||
382 | } | ||
299 | 383 | ||
300 | row = cursor.fetchone() | 384 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): |
385 | if k in data: | ||
386 | outhash_data[k] = data[k] | ||
387 | |||
388 | # Insert the new entry, unless it already exists | ||
389 | (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) | ||
390 | |||
391 | if inserted: | ||
392 | # If this row is new, check if it is equivalent to another | ||
393 | # output hash | ||
394 | cursor.execute( | ||
395 | ''' | ||
396 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
397 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
398 | -- Select any matching output hash except the one we just inserted | ||
399 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
400 | -- Pick the oldest hash | ||
401 | ORDER BY outhashes_v2.created ASC | ||
402 | LIMIT 1 | ||
403 | ''', | ||
404 | { | ||
405 | 'method': data['method'], | ||
406 | 'outhash': data['outhash'], | ||
407 | 'taskhash': data['taskhash'], | ||
408 | } | ||
409 | ) | ||
410 | row = cursor.fetchone() | ||
301 | 411 | ||
302 | if row is None and self.upstream_client: | ||
303 | # Try upstream | ||
304 | row = await copy_outhash_from_upstream(self.upstream_client, | ||
305 | self.db, | ||
306 | data['method'], | ||
307 | data['outhash'], | ||
308 | data['taskhash']) | ||
309 | |||
310 | # If no matching outhash was found, or one *was* found but it | ||
311 | # wasn't an exact match on the taskhash, a new entry for this | ||
312 | # taskhash should be added | ||
313 | if row is None or row['taskhash'] != data['taskhash']: | ||
314 | # If a row matching the outhash was found, the unihash for | ||
315 | # the new taskhash should be the same as that one. | ||
316 | # Otherwise the caller provided unihash is used. | ||
317 | unihash = data['unihash'] | ||
318 | if row is not None: | 412 | if row is not None: |
413 | # A matching output hash was found. Set our taskhash to the | ||
414 | # same unihash since they are equivalent | ||
319 | unihash = row['unihash'] | 415 | unihash = row['unihash'] |
416 | resolve = Resolve.REPLACE | ||
417 | else: | ||
418 | # No matching output hash was found. This is probably the | ||
419 | # first outhash to be added. | ||
420 | unihash = data['unihash'] | ||
421 | resolve = Resolve.IGNORE | ||
422 | |||
423 | # Query upstream to see if it has a unihash we can use | ||
424 | if self.upstream_client is not None: | ||
425 | upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) | ||
426 | if upstream_data is not None: | ||
427 | unihash = upstream_data['unihash'] | ||
428 | |||
429 | |||
430 | insert_unihash( | ||
431 | cursor, | ||
432 | { | ||
433 | 'method': data['method'], | ||
434 | 'taskhash': data['taskhash'], | ||
435 | 'unihash': unihash, | ||
436 | }, | ||
437 | resolve | ||
438 | ) | ||
439 | |||
440 | unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) | ||
441 | if unihash_data is not None: | ||
442 | unihash = unihash_data['unihash'] | ||
443 | else: | ||
444 | unihash = data['unihash'] | ||
320 | 445 | ||
321 | insert_data = { | 446 | self.db.commit() |
322 | 'method': data['method'], | ||
323 | 'outhash': data['outhash'], | ||
324 | 'taskhash': data['taskhash'], | ||
325 | 'unihash': unihash, | ||
326 | 'created': datetime.now() | ||
327 | } | ||
328 | |||
329 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | ||
330 | if k in data: | ||
331 | insert_data[k] = data[k] | ||
332 | |||
333 | insert_task(cursor, insert_data) | ||
334 | self.db.commit() | ||
335 | |||
336 | logger.info('Adding taskhash %s with unihash %s', | ||
337 | data['taskhash'], unihash) | ||
338 | 447 | ||
339 | d = { | 448 | d = { |
340 | 'taskhash': data['taskhash'], | 449 | 'taskhash': data['taskhash'], |
341 | 'method': data['method'], | 450 | 'method': data['method'], |
342 | 'unihash': unihash | 451 | 'unihash': unihash, |
343 | } | 452 | } |
344 | else: | ||
345 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | ||
346 | 453 | ||
347 | self.write_message(d) | 454 | self.write_message(d) |
348 | 455 | ||
@@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
350 | with closing(self.db.cursor()) as cursor: | 457 | with closing(self.db.cursor()) as cursor: |
351 | insert_data = { | 458 | insert_data = { |
352 | 'method': data['method'], | 459 | 'method': data['method'], |
353 | 'outhash': "", | ||
354 | 'taskhash': data['taskhash'], | 460 | 'taskhash': data['taskhash'], |
355 | 'unihash': data['unihash'], | 461 | 'unihash': data['unihash'], |
356 | 'created': datetime.now() | ||
357 | } | 462 | } |
358 | 463 | insert_unihash(cursor, insert_data, Resolve.IGNORE) | |
359 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | ||
360 | if k in data: | ||
361 | insert_data[k] = data[k] | ||
362 | |||
363 | insert_task(cursor, insert_data, ignore=True) | ||
364 | self.db.commit() | 464 | self.db.commit() |
365 | 465 | ||
366 | # Fetch the unihash that will be reported for the taskhash. If the | 466 | # Fetch the unihash that will be reported for the taskhash. If the |
367 | # unihash matches, it means this row was inserted (or the mapping | 467 | # unihash matches, it means this row was inserted (or the mapping |
368 | # was already valid) | 468 | # was already valid) |
369 | row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY) | 469 | row = self.query_equivalent(cursor, data['method'], data['taskhash']) |
370 | 470 | ||
371 | if row['unihash'] == data['unihash']: | 471 | if row['unihash'] == data['unihash']: |
372 | logger.info('Adding taskhash equivalence for %s with unihash %s', | 472 | logger.info('Adding taskhash equivalence for %s with unihash %s', |
@@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
399 | await self.backfill_queue.join() | 499 | await self.backfill_queue.join() |
400 | self.write_message(d) | 500 | self.write_message(d) |
401 | 501 | ||
402 | def query_equivalent(self, method, taskhash, query): | 502 | def query_equivalent(self, cursor, method, taskhash): |
403 | # This is part of the inner loop and must be as fast as possible | 503 | # This is part of the inner loop and must be as fast as possible |
404 | try: | 504 | cursor.execute( |
405 | cursor = self.db.cursor() | 505 | 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', |
406 | cursor.execute(query, {'method': method, 'taskhash': taskhash}) | 506 | { |
407 | return cursor.fetchone() | 507 | 'method': method, |
408 | except: | 508 | 'taskhash': taskhash, |
409 | cursor.close() | 509 | } |
510 | ) | ||
511 | return cursor.fetchone() | ||
410 | 512 | ||
411 | 513 | ||
412 | class Server(bb.asyncrpc.AsyncServer): | 514 | class Server(bb.asyncrpc.AsyncServer): |
@@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
435 | self.backfill_queue.task_done() | 537 | self.backfill_queue.task_done() |
436 | break | 538 | break |
437 | method, taskhash = item | 539 | method, taskhash = item |
438 | await copy_from_upstream(client, self.db, method, taskhash) | 540 | await copy_unihash_from_upstream(client, self.db, method, taskhash) |
439 | self.backfill_queue.task_done() | 541 | self.backfill_queue.task_done() |
440 | finally: | 542 | finally: |
441 | await client.close() | 543 | await client.close() |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 1fcfb6b929..efaf3bdf42 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -19,10 +19,10 @@ import time | |||
19 | import signal | 19 | import signal |
20 | 20 | ||
21 | def server_prefunc(server, idx): | 21 | def server_prefunc(server, idx): |
22 | logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w', | 22 | logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w', |
23 | format='%(levelname)s %(filename)s:%(lineno)d %(message)s') | 23 | format='%(levelname)s %(filename)s:%(lineno)d %(message)s') |
24 | server.logger.debug("Running server %d" % idx) | 24 | server.logger.debug("Running server %d" % idx) |
25 | sys.stdout = open('bbhashserv-%d.log' % idx, 'w') | 25 | sys.stdout = open('bbhashserv-stdout-%d.log' % idx, 'w') |
26 | sys.stderr = sys.stdout | 26 | sys.stderr = sys.stdout |
27 | 27 | ||
28 | class HashEquivalenceTestSetup(object): | 28 | class HashEquivalenceTestSetup(object): |
@@ -140,12 +140,17 @@ class HashEquivalenceCommonTests(object): | |||
140 | }) | 140 | }) |
141 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | 141 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') |
142 | 142 | ||
143 | result = self.client.get_taskhash(self.METHOD, taskhash, True) | 143 | result_unihash = self.client.get_taskhash(self.METHOD, taskhash, True) |
144 | self.assertEqual(result['taskhash'], taskhash) | 144 | self.assertEqual(result_unihash['taskhash'], taskhash) |
145 | self.assertEqual(result['unihash'], unihash) | 145 | self.assertEqual(result_unihash['unihash'], unihash) |
146 | self.assertEqual(result['method'], self.METHOD) | 146 | self.assertEqual(result_unihash['method'], self.METHOD) |
147 | self.assertEqual(result['outhash'], outhash) | 147 | |
148 | self.assertEqual(result['outhash_siginfo'], siginfo) | 148 | result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash) |
149 | self.assertEqual(result_outhash['taskhash'], taskhash) | ||
150 | self.assertEqual(result_outhash['method'], self.METHOD) | ||
151 | self.assertEqual(result_outhash['unihash'], unihash) | ||
152 | self.assertEqual(result_outhash['outhash'], outhash) | ||
153 | self.assertEqual(result_outhash['outhash_siginfo'], siginfo) | ||
149 | 154 | ||
150 | def test_stress(self): | 155 | def test_stress(self): |
151 | def query_server(failures): | 156 | def query_server(failures): |
@@ -260,6 +265,39 @@ class HashEquivalenceCommonTests(object): | |||
260 | result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) | 265 | result = down_client.report_unihash(taskhash6, self.METHOD, outhash5, unihash6) |
261 | self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') | 266 | self.assertEqual(result['unihash'], unihash5, 'Server failed to copy unihash from upstream') |
262 | 267 | ||
268 | # Tests read through from server with | ||
269 | taskhash7 = '9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74' | ||
270 | outhash7 = '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69' | ||
271 | unihash7 = '05d2a63c81e32f0a36542ca677e8ad852365c538' | ||
272 | self.client.report_unihash(taskhash7, self.METHOD, outhash7, unihash7) | ||
273 | |||
274 | result = down_client.get_taskhash(self.METHOD, taskhash7, True) | ||
275 | self.assertEqual(result['unihash'], unihash7, 'Server failed to copy unihash from upstream') | ||
276 | self.assertEqual(result['outhash'], outhash7, 'Server failed to copy unihash from upstream') | ||
277 | self.assertEqual(result['taskhash'], taskhash7, 'Server failed to copy unihash from upstream') | ||
278 | self.assertEqual(result['method'], self.METHOD) | ||
279 | |||
280 | taskhash8 = '86978a4c8c71b9b487330b0152aade10c1ee58aa' | ||
281 | outhash8 = 'ca8c128e9d9e4a28ef24d0508aa20b5cf880604eacd8f65c0e366f7e0cc5fbcf' | ||
282 | unihash8 = 'd8bcf25369d40590ad7d08c84d538982f2023e01' | ||
283 | self.client.report_unihash(taskhash8, self.METHOD, outhash8, unihash8) | ||
284 | |||
285 | result = down_client.get_outhash(self.METHOD, outhash8, taskhash8) | ||
286 | self.assertEqual(result['unihash'], unihash8, 'Server failed to copy unihash from upstream') | ||
287 | self.assertEqual(result['outhash'], outhash8, 'Server failed to copy unihash from upstream') | ||
288 | self.assertEqual(result['taskhash'], taskhash8, 'Server failed to copy unihash from upstream') | ||
289 | self.assertEqual(result['method'], self.METHOD) | ||
290 | |||
291 | taskhash9 = 'ae6339531895ddf5b67e663e6a374ad8ec71d81c' | ||
292 | outhash9 = 'afc78172c81880ae10a1fec994b5b4ee33d196a001a1b66212a15ebe573e00b5' | ||
293 | unihash9 = '6662e699d6e3d894b24408ff9a4031ef9b038ee8' | ||
294 | self.client.report_unihash(taskhash9, self.METHOD, outhash9, unihash9) | ||
295 | |||
296 | result = down_client.get_taskhash(self.METHOD, taskhash9, False) | ||
297 | self.assertEqual(result['unihash'], unihash9, 'Server failed to copy unihash from upstream') | ||
298 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') | ||
299 | self.assertEqual(result['method'], self.METHOD) | ||
300 | |||
263 | def test_ro_server(self): | 301 | def test_ro_server(self): |
264 | (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) | 302 | (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True) |
265 | 303 | ||
@@ -287,10 +325,8 @@ class HashEquivalenceCommonTests(object): | |||
287 | 325 | ||
288 | 326 | ||
289 | def test_slow_server_start(self): | 327 | def test_slow_server_start(self): |
290 | """ | 328 | # Ensures that the server will exit correctly even if it gets a SIGTERM |
291 | Ensures that the server will exit correctly even if it gets a SIGTERM | 329 | # before entering the main loop |
292 | before entering the main loop | ||
293 | """ | ||
294 | 330 | ||
295 | event = multiprocessing.Event() | 331 | event = multiprocessing.Event() |
296 | 332 | ||