summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/prserv/serv.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
-rw-r--r--bitbake/lib/prserv/serv.py140
1 files changed, 115 insertions, 25 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index dc4be5b620..e175886308 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -12,6 +12,7 @@ import sqlite3
12import prserv 12import prserv
13import prserv.db 13import prserv.db
14import errno 14import errno
15from . import create_async_client, revision_smaller, increase_revision
15import bb.asyncrpc 16import bb.asyncrpc
16 17
17logger = logging.getLogger("BitBake.PRserv") 18logger = logging.getLogger("BitBake.PRserv")
@@ -41,18 +42,16 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
41 try: 42 try:
42 return await super().dispatch_message(msg) 43 return await super().dispatch_message(msg)
43 except: 44 except:
44 self.server.table.sync()
45 raise 45 raise
46 else:
47 self.server.table.sync_if_dirty()
48 46
49 async def handle_test_pr(self, request): 47 async def handle_test_pr(self, request):
50 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' 48 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
51 version = request["version"] 49 version = request["version"]
52 pkgarch = request["pkgarch"] 50 pkgarch = request["pkgarch"]
53 checksum = request["checksum"] 51 checksum = request["checksum"]
52 history = request["history"]
54 53
55 value = self.server.table.find_value(version, pkgarch, checksum) 54 value = self.server.table.find_value(version, pkgarch, checksum, history)
56 return {"value": value} 55 return {"value": value}
57 56
58 async def handle_test_package(self, request): 57 async def handle_test_package(self, request):
@@ -68,22 +67,110 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
68 version = request["version"] 67 version = request["version"]
69 pkgarch = request["pkgarch"] 68 pkgarch = request["pkgarch"]
70 69
71 value = self.server.table.find_max_value(version, pkgarch) 70 value = self.server.table.find_package_max_value(version, pkgarch)
72 return {"value": value} 71 return {"value": value}
73 72
74 async def handle_get_pr(self, request): 73 async def handle_get_pr(self, request):
75 version = request["version"] 74 version = request["version"]
76 pkgarch = request["pkgarch"] 75 pkgarch = request["pkgarch"]
77 checksum = request["checksum"] 76 checksum = request["checksum"]
77 history = request["history"]
78 78
79 response = None 79 if self.upstream_client is None:
80 try: 80 value = self.server.table.get_value(version, pkgarch, checksum, history)
81 value = self.server.table.get_value(version, pkgarch, checksum) 81 return {"value": value}
82 response = {"value": value}
83 except prserv.NotFoundError:
84 self.logger.error("failure storing value in database for (%s, %s)",version, checksum)
85 82
86 return response 83 # We have an upstream server.
84 # Check whether the local server already knows the requested configuration.
85 # If the configuration is a new one, the generated value we will add will
86 # depend on what's on the upstream server. That's why we're calling find_value()
87 # instead of get_value() directly.
88
89 value = self.server.table.find_value(version, pkgarch, checksum, history)
90 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
91
92 if value is not None:
93
94 # The configuration is already known locally.
95
96 if history:
97 value = self.server.table.get_value(version, pkgarch, checksum, history)
98 else:
99 existing_value = value
100 # In "no history", we need to make sure the value doesn't decrease
101 # and is at least greater than the maximum upstream value
102 # and the maximum local value
103
104 local_max = self.server.table.find_package_max_value(version, pkgarch)
105 if revision_smaller(value, local_max):
106 value = increase_revision(local_max)
107
108 if revision_smaller(value, upstream_max):
109 # Ask upstream whether it knows the checksum
110 upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
111 if upstream_value is None:
112 # Upstream doesn't have our checksum, let create a new one
113 value = upstream_max + ".0"
114 else:
115 # Fine to take the same value as upstream
116 value = upstream_max
117
118 if not value == existing_value and not self.server.read_only:
119 self.server.table.store_value(version, pkgarch, checksum, value)
120
121 return {"value": value}
122
123 # The configuration is a new one for the local server
124 # Let's ask the upstream server whether it knows it
125
126 known_upstream = await self.upstream_client.test_package(version, pkgarch)
127
128 if not known_upstream:
129
130 # The package is not known upstream, must be a local-only package
131 # Let's compute the PR number using the local-only method
132
133 value = self.server.table.get_value(version, pkgarch, checksum, history)
134 return {"value": value}
135
136 # The package is known upstream, let's ask the upstream server
137 # whether it knows our new output hash
138
139 value = await self.upstream_client.test_pr(version, pkgarch, checksum)
140
141 if value is not None:
142
143 # Upstream knows this output hash, let's store it and use it too.
144
145 if not self.server.read_only:
146 self.server.table.store_value(version, pkgarch, checksum, value)
147 # If the local server is read only, won't be able to store the new
148 # value in the database and will have to keep asking the upstream server
149 return {"value": value}
150
151 # The output hash doesn't exist upstream, get the most recent number from upstream (x)
152 # Then, we want to have a new PR value for the local server: x.y
153
154 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
155 # Here we know that the package is known upstream, so upstream_max can't be None
156 subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
157
158 if not self.server.read_only:
159 self.server.table.store_value(version, pkgarch, checksum, subvalue)
160
161 return {"value": subvalue}
162
163 async def process_requests(self):
164 if self.server.upstream is not None:
165 self.upstream_client = await create_async_client(self.server.upstream)
166 else:
167 self.upstream_client = None
168
169 try:
170 await super().process_requests()
171 finally:
172 if self.upstream_client is not None:
173 await self.upstream_client.close()
87 174
88 async def handle_import_one(self, request): 175 async def handle_import_one(self, request):
89 response = None 176 response = None
@@ -104,9 +191,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
104 pkgarch = request["pkgarch"] 191 pkgarch = request["pkgarch"]
105 checksum = request["checksum"] 192 checksum = request["checksum"]
106 colinfo = request["colinfo"] 193 colinfo = request["colinfo"]
194 history = request["history"]
107 195
108 try: 196 try:
109 (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo) 197 (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
110 except sqlite3.Error as exc: 198 except sqlite3.Error as exc:
111 self.logger.error(str(exc)) 199 self.logger.error(str(exc))
112 metainfo = datainfo = None 200 metainfo = datainfo = None
@@ -117,11 +205,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
117 return {"readonly": self.server.read_only} 205 return {"readonly": self.server.read_only}
118 206
119class PRServer(bb.asyncrpc.AsyncServer): 207class PRServer(bb.asyncrpc.AsyncServer):
120 def __init__(self, dbfile, read_only=False): 208 def __init__(self, dbfile, read_only=False, upstream=None):
121 super().__init__(logger) 209 super().__init__(logger)
122 self.dbfile = dbfile 210 self.dbfile = dbfile
123 self.table = None 211 self.table = None
124 self.read_only = read_only 212 self.read_only = read_only
213 self.upstream = upstream
125 214
126 def accept_client(self, socket): 215 def accept_client(self, socket):
127 return PRServerClient(socket, self) 216 return PRServerClient(socket, self)
@@ -134,27 +223,25 @@ class PRServer(bb.asyncrpc.AsyncServer):
134 self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 223 self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
135 (self.dbfile, self.address, str(os.getpid()))) 224 (self.dbfile, self.address, str(os.getpid())))
136 225
226 if self.upstream is not None:
227 self.logger.info("And upstream PRServer: %s " % (self.upstream))
228
137 return tasks 229 return tasks
138 230
139 async def stop(self): 231 async def stop(self):
140 self.table.sync_if_dirty()
141 self.db.disconnect() 232 self.db.disconnect()
142 await super().stop() 233 await super().stop()
143 234
144 def signal_handler(self):
145 super().signal_handler()
146 if self.table:
147 self.table.sync()
148
149class PRServSingleton(object): 235class PRServSingleton(object):
150 def __init__(self, dbfile, logfile, host, port): 236 def __init__(self, dbfile, logfile, host, port, upstream):
151 self.dbfile = dbfile 237 self.dbfile = dbfile
152 self.logfile = logfile 238 self.logfile = logfile
153 self.host = host 239 self.host = host
154 self.port = port 240 self.port = port
241 self.upstream = upstream
155 242
156 def start(self): 243 def start(self):
157 self.prserv = PRServer(self.dbfile) 244 self.prserv = PRServer(self.dbfile, upstream=self.upstream)
158 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) 245 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
159 self.process = self.prserv.serve_as_process(log_level=logging.WARNING) 246 self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
160 247
@@ -233,7 +320,7 @@ def run_as_daemon(func, pidfile, logfile):
233 os.remove(pidfile) 320 os.remove(pidfile)
234 os._exit(0) 321 os._exit(0)
235 322
236def start_daemon(dbfile, host, port, logfile, read_only=False): 323def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
237 ip = socket.gethostbyname(host) 324 ip = socket.gethostbyname(host)
238 pidfile = PIDPREFIX % (ip, port) 325 pidfile = PIDPREFIX % (ip, port)
239 try: 326 try:
@@ -249,7 +336,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False):
249 336
250 dbfile = os.path.abspath(dbfile) 337 dbfile = os.path.abspath(dbfile)
251 def daemon_main(): 338 def daemon_main():
252 server = PRServer(dbfile, read_only=read_only) 339 server = PRServer(dbfile, read_only=read_only, upstream=upstream)
253 server.start_tcp_server(ip, port) 340 server.start_tcp_server(ip, port)
254 server.serve_forever() 341 server.serve_forever()
255 342
@@ -336,6 +423,9 @@ def auto_start(d):
336 423
337 host = host_params[0].strip().lower() 424 host = host_params[0].strip().lower()
338 port = int(host_params[1]) 425 port = int(host_params[1])
426
427 upstream = d.getVar("PRSERV_UPSTREAM") or None
428
339 if is_local_special(host, port): 429 if is_local_special(host, port):
340 import bb.utils 430 import bb.utils
341 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 431 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
@@ -350,7 +440,7 @@ def auto_start(d):
350 auto_shutdown() 440 auto_shutdown()
351 if not singleton: 441 if not singleton:
352 bb.utils.mkdirhier(cachedir) 442 bb.utils.mkdirhier(cachedir)
353 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) 443 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
354 singleton.start() 444 singleton.start()
355 if singleton: 445 if singleton:
356 host = singleton.host 446 host = singleton.host