summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
authorJoshua Watt <jpewhacker@gmail.com>2019-09-17 08:37:11 -0500
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-09-18 11:52:03 +0100
commit19b60d0e7a01df5b435c7fb17211ad64369acc54 (patch)
tree5d8b57cd47df8aa6da76208080012ffe453a4ac0 /bitbake/lib/hashserv/client.py
parent55bb60614972659ba6f4947ff40749a3b2e38d1a (diff)
downloadpoky-19b60d0e7a01df5b435c7fb17211ad64369acc54.tar.gz
bitbake: bitbake: Rework hash equivalence
Reworks the hash equivalence server to address performance issues that were encountered with the REST mechanism used previously, particularly during the heavy request load encountered during signature generation. Notable changes are: 1) The server protocol is no longer HTTP based. Instead, it uses a simpler JSON over a streaming protocol link. This protocol has much lower overhead than HTTP since it eliminates the HTTP headers. 2) The hash equivalence server can either bind to a TCP port, or a Unix domain socket. Unix domain sockets are more efficient for local communication, and so are preferred if the user enables hash equivalence only for the local build. The arguments to the 'bitbake-hashserve' command have been updated accordingly. 3) The value to which BB_HASHSERVE should be set to enable a local hash equivalence server is changed to "auto" instead of "localhost:0". The latter didn't make sense when the local server was using a Unix domain socket. 4) Clients are expected to keep a persistent connection to the server instead of creating a new connection each time a request is made for optimal performance. 5) Most of the client logic has been moved to the hashserve module in bitbake. This makes it easier to share the client code. 6) A new bitbake command has been added called 'bitbake-hashclient'. This command can be used to query a hash equivalence server, including fetching the statistics and running a performance stress test. 7) The table indexes in the SQLite database have been updated to optimize hash lookups. This change is backward compatible, as the database will delete the old indexes first if they exist. 8) The server has been reworked to use python async to maximize performance with persistently connected clients. This requires Python 3.5 or later. (Bitbake rev: 1f404bd23335f6c5f6ca944c5be0b838ffb76c4d) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r--bitbake/lib/hashserv/client.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
new file mode 100644
index 0000000000..4d3c35f00c
--- /dev/null
+++ b/bitbake/lib/hashserv/client.py
@@ -0,0 +1,150 @@
1# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
6from contextlib import closing
7import json
8import logging
9import socket
10
11
12logger = logging.getLogger('hashserv.client')
13
14
15class HashConnectionError(Exception):
16 pass
17
18
19class Client(object):
20 MODE_NORMAL = 0
21 MODE_GET_STREAM = 1
22
23 def __init__(self):
24 self._socket = None
25 self.reader = None
26 self.writer = None
27 self.mode = self.MODE_NORMAL
28
29 def connect_tcp(self, address, port):
30 def connect_sock():
31 s = socket.create_connection((address, port))
32
33 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
34 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
35 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
36 return s
37
38 self._connect_sock = connect_sock
39
40 def connect_unix(self, path):
41 def connect_sock():
42 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
43 s.connect(path)
44 return s
45
46 self._connect_sock = connect_sock
47
48 def connect(self):
49 if self._socket is None:
50 self._socket = self._connect_sock()
51
52 self.reader = self._socket.makefile('r', encoding='utf-8')
53 self.writer = self._socket.makefile('w', encoding='utf-8')
54
55 self.writer.write('OEHASHEQUIV 1.0\n\n')
56 self.writer.flush()
57
58 # Restore mode if the socket is being re-created
59 cur_mode = self.mode
60 self.mode = self.MODE_NORMAL
61 self._set_mode(cur_mode)
62
63 return self._socket
64
65 def close(self):
66 if self._socket is not None:
67 self._socket.close()
68 self._socket = None
69 self.reader = None
70 self.writer = None
71
72 def _send_wrapper(self, proc):
73 count = 0
74 while True:
75 try:
76 self.connect()
77 return proc()
78 except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e:
79 logger.warning('Error talking to server: %s' % e)
80 if count >= 3:
81 if not isinstance(e, HashConnectionError):
82 raise HashConnectionError(str(e))
83 raise e
84 self.close()
85 count += 1
86
87 def send_message(self, msg):
88 def proc():
89 self.writer.write('%s\n' % json.dumps(msg))
90 self.writer.flush()
91
92 l = self.reader.readline()
93 if not l:
94 raise HashConnectionError('Connection closed')
95
96 if not l.endswith('\n'):
97 raise HashConnectionError('Bad message %r' % message)
98
99 return json.loads(l)
100
101 return self._send_wrapper(proc)
102
103 def send_stream(self, msg):
104 def proc():
105 self.writer.write("%s\n" % msg)
106 self.writer.flush()
107 l = self.reader.readline()
108 if not l:
109 raise HashConnectionError('Connection closed')
110 return l.rstrip()
111
112 return self._send_wrapper(proc)
113
114 def _set_mode(self, new_mode):
115 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
116 r = self.send_stream('END')
117 if r != 'ok':
118 raise HashConnectionError('Bad response from server %r' % r)
119 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
120 r = self.send_message({'get-stream': None})
121 if r != 'ok':
122 raise HashConnectionError('Bad response from server %r' % r)
123 elif new_mode != self.mode:
124 raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode))
125
126 self.mode = new_mode
127
128 def get_unihash(self, method, taskhash):
129 self._set_mode(self.MODE_GET_STREAM)
130 r = self.send_stream('%s %s' % (method, taskhash))
131 if not r:
132 return None
133 return r
134
135 def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
136 self._set_mode(self.MODE_NORMAL)
137 m = extra.copy()
138 m['taskhash'] = taskhash
139 m['method'] = method
140 m['outhash'] = outhash
141 m['unihash'] = unihash
142 return self.send_message({'report': m})
143
144 def get_stats(self):
145 self._set_mode(self.MODE_NORMAL)
146 return self.send_message({'get-stats': None})
147
148 def reset_stats(self):
149 self._set_mode(self.MODE_NORMAL)
150 return self.send_message({'reset-stats': None})