summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/prserv/serv.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/prserv/serv.py')
-rw-r--r--bitbake/lib/prserv/serv.py684
1 files changed, 327 insertions, 357 deletions
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 25dcf8a0ee..e175886308 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -1,354 +1,326 @@
1# 1#
2# Copyright BitBake Contributors
3#
2# SPDX-License-Identifier: GPL-2.0-only 4# SPDX-License-Identifier: GPL-2.0-only
3# 5#
4 6
5import os,sys,logging 7import os,sys,logging
6import signal, time 8import signal, time
7from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
8import threading
9import queue
10import socket 9import socket
11import io 10import io
12import sqlite3 11import sqlite3
13import bb.server.xmlrpcclient
14import prserv 12import prserv
15import prserv.db 13import prserv.db
16import errno 14import errno
17import select 15from . import create_async_client, revision_smaller, increase_revision
16import bb.asyncrpc
18 17
19logger = logging.getLogger("BitBake.PRserv") 18logger = logging.getLogger("BitBake.PRserv")
20 19
21if sys.hexversion < 0x020600F0: 20PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
22 print("Sorry, python 2.6 or later is required.") 21singleton = None
23 sys.exit(1)
24 22
25class Handler(SimpleXMLRPCRequestHandler): 23class PRServerClient(bb.asyncrpc.AsyncServerConnection):
26 def _dispatch(self,method,params): 24 def __init__(self, socket, server):
25 super().__init__(socket, "PRSERVICE", server.logger)
26 self.server = server
27
28 self.handlers.update({
29 "get-pr": self.handle_get_pr,
30 "test-pr": self.handle_test_pr,
31 "test-package": self.handle_test_package,
32 "max-package-pr": self.handle_max_package_pr,
33 "import-one": self.handle_import_one,
34 "export": self.handle_export,
35 "is-readonly": self.handle_is_readonly,
36 })
37
38 def validate_proto_version(self):
39 return (self.proto_version == (1, 0))
40
41 async def dispatch_message(self, msg):
27 try: 42 try:
28 value=self.server.funcs[method](*params) 43 return await super().dispatch_message(msg)
29 except: 44 except:
30 import traceback
31 traceback.print_exc()
32 raise 45 raise
33 return value
34 46
35PIDPREFIX = "/tmp/PRServer_%s_%s.pid" 47 async def handle_test_pr(self, request):
36singleton = None 48 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
49 version = request["version"]
50 pkgarch = request["pkgarch"]
51 checksum = request["checksum"]
52 history = request["history"]
37 53
54 value = self.server.table.find_value(version, pkgarch, checksum, history)
55 return {"value": value}
38 56
39class PRServer(SimpleXMLRPCServer): 57 async def handle_test_package(self, request):
40 def __init__(self, dbfile, logfile, interface, daemon=True): 58 '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False'''
41 ''' constructor ''' 59 version = request["version"]
42 try: 60 pkgarch = request["pkgarch"]
43 SimpleXMLRPCServer.__init__(self, interface,
44 logRequests=False, allow_none=True)
45 except socket.error:
46 ip=socket.gethostbyname(interface[0])
47 port=interface[1]
48 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
49 sys.stderr.write(msg)
50 raise PRServiceConfigError
51 61
52 self.dbfile=dbfile 62 value = self.server.table.test_package(version, pkgarch)
53 self.daemon=daemon 63 return {"value": value}
54 self.logfile=logfile
55 self.working_thread=None
56 self.host, self.port = self.socket.getsockname()
57 self.pidfile=PIDPREFIX % (self.host, self.port)
58
59 self.register_function(self.getPR, "getPR")
60 self.register_function(self.quit, "quit")
61 self.register_function(self.ping, "ping")
62 self.register_function(self.export, "export")
63 self.register_function(self.dump_db, "dump_db")
64 self.register_function(self.importone, "importone")
65 self.register_introspection_functions()
66
67 self.quitpipein, self.quitpipeout = os.pipe()
68
69 self.requestqueue = queue.Queue()
70 self.handlerthread = threading.Thread(target = self.process_request_thread)
71 self.handlerthread.daemon = False
72
73 def process_request_thread(self):
74 """Same as in BaseServer but as a thread.
75
76 In addition, exception handling is done here.
77
78 """
79 iter_count = 1
80 # 60 iterations between syncs or sync if dirty every ~30 seconds
81 iterations_between_sync = 60
82
83 bb.utils.set_process_name("PRServ Handler")
84
85 while not self.quitflag:
86 try:
87 (request, client_address) = self.requestqueue.get(True, 30)
88 except queue.Empty:
89 self.table.sync_if_dirty()
90 continue
91 if request is None:
92 continue
93 try:
94 self.finish_request(request, client_address)
95 self.shutdown_request(request)
96 iter_count = (iter_count + 1) % iterations_between_sync
97 if iter_count == 0:
98 self.table.sync_if_dirty()
99 except:
100 self.handle_error(request, client_address)
101 self.shutdown_request(request)
102 self.table.sync()
103 self.table.sync_if_dirty()
104
105 def sigint_handler(self, signum, stack):
106 if self.table:
107 self.table.sync()
108
109 def sigterm_handler(self, signum, stack):
110 if self.table:
111 self.table.sync()
112 self.quit()
113 self.requestqueue.put((None, None))
114
115 def process_request(self, request, client_address):
116 self.requestqueue.put((request, client_address))
117
118 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
119 try:
120 return self.table.export(version, pkgarch, checksum, colinfo)
121 except sqlite3.Error as exc:
122 logger.error(str(exc))
123 return None
124
125 def dump_db(self):
126 """
127 Returns a script (string) that reconstructs the state of the
128 entire database at the time this function is called. The script
129 language is defined by the backing database engine, which is a
130 function of server configuration.
131 Returns None if the database engine does not support dumping to
132 script or if some other error is encountered in processing.
133 """
134 buff = io.StringIO()
135 try:
136 self.table.sync()
137 self.table.dump_db(buff)
138 return buff.getvalue()
139 except Exception as exc:
140 logger.error(str(exc))
141 return None
142 finally:
143 buff.close()
144 64
145 def importone(self, version, pkgarch, checksum, value): 65 async def handle_max_package_pr(self, request):
146 return self.table.importone(version, pkgarch, checksum, value) 66 '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found'''
67 version = request["version"]
68 pkgarch = request["pkgarch"]
147 69
148 def ping(self): 70 value = self.server.table.find_package_max_value(version, pkgarch)
149 return not self.quitflag 71 return {"value": value}
150 72
151 def getinfo(self): 73 async def handle_get_pr(self, request):
152 return (self.host, self.port) 74 version = request["version"]
75 pkgarch = request["pkgarch"]
76 checksum = request["checksum"]
77 history = request["history"]
153 78
154 def getPR(self, version, pkgarch, checksum): 79 if self.upstream_client is None:
155 try: 80 value = self.server.table.get_value(version, pkgarch, checksum, history)
156 return self.table.getValue(version, pkgarch, checksum) 81 return {"value": value}
157 except prserv.NotFoundError:
158 logger.error("can not find value for (%s, %s)",version, checksum)
159 return None
160 except sqlite3.Error as exc:
161 logger.error(str(exc))
162 return None
163
164 def quit(self):
165 self.quitflag=True
166 os.write(self.quitpipeout, b"q")
167 os.close(self.quitpipeout)
168 return
169
170 def work_forever(self,):
171 self.quitflag = False
172 # This timeout applies to the poll in TCPServer, we need the select
173 # below to wake on our quit pipe closing. We only ever call into handle_request
174 # if there is data there.
175 self.timeout = 0.01
176
177 bb.utils.set_process_name("PRServ")
178
179 # DB connection must be created after all forks
180 self.db = prserv.db.PRData(self.dbfile)
181 self.table = self.db["PRMAIN"]
182 82
183 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % 83 # We have an upstream server.
184 (self.dbfile, self.host, self.port, str(os.getpid()))) 84 # Check whether the local server already knows the requested configuration.
185 85 # If the configuration is a new one, the generated value we will add will
186 self.handlerthread.start() 86 # depend on what's on the upstream server. That's why we're calling find_value()
187 while not self.quitflag: 87 # instead of get_value() directly.
188 ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
189 if self.quitflag:
190 break
191 if self.fileno() in ready[0]:
192 self.handle_request()
193 self.handlerthread.join()
194 self.db.disconnect()
195 logger.info("PRServer: stopping...")
196 self.server_close()
197 os.close(self.quitpipein)
198 return
199 88
200 def start(self): 89 value = self.server.table.find_value(version, pkgarch, checksum, history)
201 if self.daemon: 90 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
202 pid = self.daemonize()
203 else:
204 pid = self.fork()
205 self.pid = pid
206 91
207 # Ensure both the parent sees this and the child from the work_forever log entry above 92 if value is not None:
208 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
209 (self.dbfile, self.host, self.port, str(pid)))
210 93
211 def delpid(self): 94 # The configuration is already known locally.
212 os.remove(self.pidfile)
213 95
214 def daemonize(self): 96 if history:
215 """ 97 value = self.server.table.get_value(version, pkgarch, checksum, history)
216 See Advanced Programming in the UNIX, Sec 13.3 98 else:
217 """ 99 existing_value = value
218 try: 100 # In "no history", we need to make sure the value doesn't decrease
219 pid = os.fork() 101 # and is at least greater than the maximum upstream value
220 if pid > 0: 102 # and the maximum local value
221 os.waitpid(pid, 0)
222 #parent return instead of exit to give control
223 return pid
224 except OSError as e:
225 raise Exception("%s [%d]" % (e.strerror, e.errno))
226
227 os.setsid()
228 """
229 fork again to make sure the daemon is not session leader,
230 which prevents it from acquiring controlling terminal
231 """
232 try:
233 pid = os.fork()
234 if pid > 0: #parent
235 os._exit(0)
236 except OSError as e:
237 raise Exception("%s [%d]" % (e.strerror, e.errno))
238 103
239 self.cleanup_handles() 104 local_max = self.server.table.find_package_max_value(version, pkgarch)
240 os._exit(0) 105 if revision_smaller(value, local_max):
106 value = increase_revision(local_max)
107
108 if revision_smaller(value, upstream_max):
109 # Ask upstream whether it knows the checksum
110 upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
111 if upstream_value is None:
112 # Upstream doesn't have our checksum, let create a new one
113 value = upstream_max + ".0"
114 else:
115 # Fine to take the same value as upstream
116 value = upstream_max
117
118 if not value == existing_value and not self.server.read_only:
119 self.server.table.store_value(version, pkgarch, checksum, value)
120
121 return {"value": value}
122
123 # The configuration is a new one for the local server
124 # Let's ask the upstream server whether it knows it
125
126 known_upstream = await self.upstream_client.test_package(version, pkgarch)
127
128 if not known_upstream:
129
130 # The package is not known upstream, must be a local-only package
131 # Let's compute the PR number using the local-only method
132
133 value = self.server.table.get_value(version, pkgarch, checksum, history)
134 return {"value": value}
135
136 # The package is known upstream, let's ask the upstream server
137 # whether it knows our new output hash
138
139 value = await self.upstream_client.test_pr(version, pkgarch, checksum)
140
141 if value is not None:
142
143 # Upstream knows this output hash, let's store it and use it too.
144
145 if not self.server.read_only:
146 self.server.table.store_value(version, pkgarch, checksum, value)
147 # If the local server is read only, won't be able to store the new
148 # value in the database and will have to keep asking the upstream server
149 return {"value": value}
150
151 # The output hash doesn't exist upstream, get the most recent number from upstream (x)
152 # Then, we want to have a new PR value for the local server: x.y
153
154 upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
155 # Here we know that the package is known upstream, so upstream_max can't be None
156 subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
157
158 if not self.server.read_only:
159 self.server.table.store_value(version, pkgarch, checksum, subvalue)
160
161 return {"value": subvalue}
162
163 async def process_requests(self):
164 if self.server.upstream is not None:
165 self.upstream_client = await create_async_client(self.server.upstream)
166 else:
167 self.upstream_client = None
241 168
242 def fork(self):
243 try:
244 pid = os.fork()
245 if pid > 0:
246 self.socket.close() # avoid ResourceWarning in parent
247 return pid
248 except OSError as e:
249 raise Exception("%s [%d]" % (e.strerror, e.errno))
250
251 bb.utils.signal_on_parent_exit("SIGTERM")
252 self.cleanup_handles()
253 os._exit(0)
254
255 def cleanup_handles(self):
256 signal.signal(signal.SIGINT, self.sigint_handler)
257 signal.signal(signal.SIGTERM, self.sigterm_handler)
258 os.chdir("/")
259
260 sys.stdout.flush()
261 sys.stderr.flush()
262
263 # We could be called from a python thread with io.StringIO as
264 # stdout/stderr or it could be 'real' unix fd forking where we need
265 # to physically close the fds to prevent the program launching us from
266 # potentially hanging on a pipe. Handle both cases.
267 si = open('/dev/null', 'r')
268 try:
269 os.dup2(si.fileno(),sys.stdin.fileno())
270 except (AttributeError, io.UnsupportedOperation):
271 sys.stdin = si
272 so = open(self.logfile, 'a+')
273 try: 169 try:
274 os.dup2(so.fileno(),sys.stdout.fileno()) 170 await super().process_requests()
275 except (AttributeError, io.UnsupportedOperation): 171 finally:
276 sys.stdout = so 172 if self.upstream_client is not None:
173 await self.upstream_client.close()
174
175 async def handle_import_one(self, request):
176 response = None
177 if not self.server.read_only:
178 version = request["version"]
179 pkgarch = request["pkgarch"]
180 checksum = request["checksum"]
181 value = request["value"]
182
183 value = self.server.table.importone(version, pkgarch, checksum, value)
184 if value is not None:
185 response = {"value": value}
186
187 return response
188
189 async def handle_export(self, request):
190 version = request["version"]
191 pkgarch = request["pkgarch"]
192 checksum = request["checksum"]
193 colinfo = request["colinfo"]
194 history = request["history"]
195
277 try: 196 try:
278 os.dup2(so.fileno(),sys.stderr.fileno()) 197 (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
279 except (AttributeError, io.UnsupportedOperation): 198 except sqlite3.Error as exc:
280 sys.stderr = so 199 self.logger.error(str(exc))
281 200 metainfo = datainfo = None
282 # Clear out all log handlers prior to the fork() to avoid calling
283 # event handlers not part of the PRserver
284 for logger_iter in logging.Logger.manager.loggerDict.keys():
285 logging.getLogger(logger_iter).handlers = []
286
287 # Ensure logging makes it to the logfile
288 streamhandler = logging.StreamHandler()
289 streamhandler.setLevel(logging.DEBUG)
290 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
291 streamhandler.setFormatter(formatter)
292 logger.addHandler(streamhandler)
293
294 # write pidfile
295 pid = str(os.getpid())
296 with open(self.pidfile, 'w') as pf:
297 pf.write("%s\n" % pid)
298
299 self.work_forever()
300 self.delpid()
301 201
302class PRServSingleton(object): 202 return {"metainfo": metainfo, "datainfo": datainfo}
303 def __init__(self, dbfile, logfile, interface): 203
204 async def handle_is_readonly(self, request):
205 return {"readonly": self.server.read_only}
206
207class PRServer(bb.asyncrpc.AsyncServer):
208 def __init__(self, dbfile, read_only=False, upstream=None):
209 super().__init__(logger)
304 self.dbfile = dbfile 210 self.dbfile = dbfile
305 self.logfile = logfile 211 self.table = None
306 self.interface = interface 212 self.read_only = read_only
307 self.host = None 213 self.upstream = upstream
308 self.port = None 214
215 def accept_client(self, socket):
216 return PRServerClient(socket, self)
309 217
310 def start(self): 218 def start(self):
311 self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False) 219 tasks = super().start()
312 self.prserv.start() 220 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
313 self.host, self.port = self.prserv.getinfo() 221 self.table = self.db["PRMAIN"]
222
223 self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
224 (self.dbfile, self.address, str(os.getpid())))
314 225
315 def getinfo(self): 226 if self.upstream is not None:
316 return (self.host, self.port) 227 self.logger.info("And upstream PRServer: %s " % (self.upstream))
317 228
318class PRServerConnection(object): 229 return tasks
319 def __init__(self, host, port): 230
320 if is_local_special(host, port): 231 async def stop(self):
321 host, port = singleton.getinfo() 232 self.db.disconnect()
233 await super().stop()
234
235class PRServSingleton(object):
236 def __init__(self, dbfile, logfile, host, port, upstream):
237 self.dbfile = dbfile
238 self.logfile = logfile
322 self.host = host 239 self.host = host
323 self.port = port 240 self.port = port
324 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) 241 self.upstream = upstream
325
326 def terminate(self):
327 try:
328 logger.info("Terminating PRServer...")
329 self.connection.quit()
330 except Exception as exc:
331 sys.stderr.write("%s\n" % str(exc))
332 242
333 def getPR(self, version, pkgarch, checksum): 243 def start(self):
334 return self.connection.getPR(version, pkgarch, checksum) 244 self.prserv = PRServer(self.dbfile, upstream=self.upstream)
245 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
246 self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
335 247
336 def ping(self): 248 if not self.prserv.address:
337 return self.connection.ping() 249 raise PRServiceConfigError
250 if not self.port:
251 self.port = int(self.prserv.address.rsplit(":", 1)[1])
338 252
339 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): 253def run_as_daemon(func, pidfile, logfile):
340 return self.connection.export(version, pkgarch, checksum, colinfo) 254 """
255 See Advanced Programming in the UNIX, Sec 13.3
256 """
257 try:
258 pid = os.fork()
259 if pid > 0:
260 os.waitpid(pid, 0)
261 #parent return instead of exit to give control
262 return pid
263 except OSError as e:
264 raise Exception("%s [%d]" % (e.strerror, e.errno))
341 265
342 def dump_db(self): 266 os.setsid()
343 return self.connection.dump_db() 267 """
268 fork again to make sure the daemon is not session leader,
269 which prevents it from acquiring controlling terminal
270 """
271 try:
272 pid = os.fork()
273 if pid > 0: #parent
274 os._exit(0)
275 except OSError as e:
276 raise Exception("%s [%d]" % (e.strerror, e.errno))
344 277
345 def importone(self, version, pkgarch, checksum, value): 278 os.chdir("/")
346 return self.connection.importone(version, pkgarch, checksum, value)
347 279
348 def getinfo(self): 280 sys.stdout.flush()
349 return self.host, self.port 281 sys.stderr.flush()
350 282
351def start_daemon(dbfile, host, port, logfile): 283 # We could be called from a python thread with io.StringIO as
284 # stdout/stderr or it could be 'real' unix fd forking where we need
285 # to physically close the fds to prevent the program launching us from
286 # potentially hanging on a pipe. Handle both cases.
287 si = open("/dev/null", "r")
288 try:
289 os.dup2(si.fileno(), sys.stdin.fileno())
290 except (AttributeError, io.UnsupportedOperation):
291 sys.stdin = si
292 so = open(logfile, "a+")
293 try:
294 os.dup2(so.fileno(), sys.stdout.fileno())
295 except (AttributeError, io.UnsupportedOperation):
296 sys.stdout = so
297 try:
298 os.dup2(so.fileno(), sys.stderr.fileno())
299 except (AttributeError, io.UnsupportedOperation):
300 sys.stderr = so
301
302 # Clear out all log handlers prior to the fork() to avoid calling
303 # event handlers not part of the PRserver
304 for logger_iter in logging.Logger.manager.loggerDict.keys():
305 logging.getLogger(logger_iter).handlers = []
306
307 # Ensure logging makes it to the logfile
308 streamhandler = logging.StreamHandler()
309 streamhandler.setLevel(logging.DEBUG)
310 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
311 streamhandler.setFormatter(formatter)
312 logger.addHandler(streamhandler)
313
314 # write pidfile
315 pid = str(os.getpid())
316 with open(pidfile, "w") as pf:
317 pf.write("%s\n" % pid)
318
319 func()
320 os.remove(pidfile)
321 os._exit(0)
322
323def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
352 ip = socket.gethostbyname(host) 324 ip = socket.gethostbyname(host)
353 pidfile = PIDPREFIX % (ip, port) 325 pidfile = PIDPREFIX % (ip, port)
354 try: 326 try:
@@ -362,15 +334,13 @@ def start_daemon(dbfile, host, port, logfile):
362 % pidfile) 334 % pidfile)
363 return 1 335 return 1
364 336
365 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) 337 dbfile = os.path.abspath(dbfile)
366 server.start() 338 def daemon_main():
339 server = PRServer(dbfile, read_only=read_only, upstream=upstream)
340 server.start_tcp_server(ip, port)
341 server.serve_forever()
367 342
368 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with 343 run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
369 # the one the server actually is listening, so at least warn the user about it
370 _,rport = server.getinfo()
371 if port != rport:
372 sys.stdout.write("Server is listening at port %s instead of %s\n"
373 % (rport,port))
374 return 0 344 return 0
375 345
376def stop_daemon(host, port): 346def stop_daemon(host, port):
@@ -388,37 +358,28 @@ def stop_daemon(host, port):
388 # so at least advise the user which ports the corresponding server is listening 358 # so at least advise the user which ports the corresponding server is listening
389 ports = [] 359 ports = []
390 portstr = "" 360 portstr = ""
391 for pf in glob.glob(PIDPREFIX % (ip,'*')): 361 for pf in glob.glob(PIDPREFIX % (ip, "*")):
392 bn = os.path.basename(pf) 362 bn = os.path.basename(pf)
393 root, _ = os.path.splitext(bn) 363 root, _ = os.path.splitext(bn)
394 ports.append(root.split('_')[-1]) 364 ports.append(root.split("_")[-1])
395 if len(ports): 365 if len(ports):
396 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) 366 portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports))
397 367
398 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 368 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
399 % (pidfile,portstr)) 369 % (pidfile, portstr))
400 return 1 370 return 1
401 371
402 try: 372 try:
403 PRServerConnection(ip, port).terminate() 373 if is_running(pid):
404 except: 374 print("Sending SIGTERM to pr-server.")
405 logger.critical("Stop PRService %s:%d failed" % (host,port)) 375 os.kill(pid, signal.SIGTERM)
406 376 time.sleep(0.1)
407 try:
408 if pid:
409 wait_timeout = 0
410 print("Waiting for pr-server to exit.")
411 while is_running(pid) and wait_timeout < 50:
412 time.sleep(0.1)
413 wait_timeout += 1
414 377
415 if is_running(pid): 378 try:
416 print("Sending SIGTERM to pr-server.") 379 os.remove(pidfile)
417 os.kill(pid,signal.SIGTERM) 380 except FileNotFoundError:
418 time.sleep(0.1) 381 # The PID file might have been removed by the exiting process
419 382 pass
420 if os.path.exists(pidfile):
421 os.remove(pidfile)
422 383
423 except OSError as e: 384 except OSError as e:
424 err = str(e) 385 err = str(e)
@@ -436,7 +397,7 @@ def is_running(pid):
436 return True 397 return True
437 398
438def is_local_special(host, port): 399def is_local_special(host, port):
439 if host.strip().upper() == 'localhost'.upper() and (not port): 400 if (host == "localhost" or host == "127.0.0.1") and not port:
440 return True 401 return True
441 else: 402 else:
442 return False 403 return False
@@ -447,7 +408,7 @@ class PRServiceConfigError(Exception):
447def auto_start(d): 408def auto_start(d):
448 global singleton 409 global singleton
449 410
450 host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) 411 host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":")))
451 if not host_params: 412 if not host_params:
452 # Shutdown any existing PR Server 413 # Shutdown any existing PR Server
453 auto_shutdown() 414 auto_shutdown()
@@ -456,11 +417,16 @@ def auto_start(d):
456 if len(host_params) != 2: 417 if len(host_params) != 2:
457 # Shutdown any existing PR Server 418 # Shutdown any existing PR Server
458 auto_shutdown() 419 auto_shutdown()
459 logger.critical('\n'.join(['PRSERV_HOST: incorrect format', 420 logger.critical("\n".join(["PRSERV_HOST: incorrect format",
460 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 421 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
461 raise PRServiceConfigError 422 raise PRServiceConfigError
462 423
463 if is_local_special(host_params[0], int(host_params[1])): 424 host = host_params[0].strip().lower()
425 port = int(host_params[1])
426
427 upstream = d.getVar("PRSERV_UPSTREAM") or None
428
429 if is_local_special(host, port):
464 import bb.utils 430 import bb.utils
465 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 431 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
466 if not cachedir: 432 if not cachedir:
@@ -474,39 +440,43 @@ def auto_start(d):
474 auto_shutdown() 440 auto_shutdown()
475 if not singleton: 441 if not singleton:
476 bb.utils.mkdirhier(cachedir) 442 bb.utils.mkdirhier(cachedir)
477 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) 443 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
478 singleton.start() 444 singleton.start()
479 if singleton: 445 if singleton:
480 host, port = singleton.getinfo() 446 host = singleton.host
481 else: 447 port = singleton.port
482 host = host_params[0]
483 port = int(host_params[1])
484 448
485 try: 449 try:
486 connection = PRServerConnection(host,port) 450 ping(host, port)
487 connection.ping() 451 return str(host) + ":" + str(port)
488 realhost, realport = connection.getinfo() 452
489 return str(realhost) + ":" + str(realport)
490
491 except Exception: 453 except Exception:
492 logger.critical("PRservice %s:%d not available" % (host, port)) 454 logger.critical("PRservice %s:%d not available" % (host, port))
493 raise PRServiceConfigError 455 raise PRServiceConfigError
494 456
495def auto_shutdown(): 457def auto_shutdown():
496 global singleton 458 global singleton
497 if singleton: 459 if singleton and singleton.process:
498 host, port = singleton.getinfo() 460 singleton.process.terminate()
499 try: 461 singleton.process.join()
500 PRServerConnection(host, port).terminate()
501 except:
502 logger.critical("Stop PRService %s:%d failed" % (host,port))
503
504 try:
505 os.waitpid(singleton.prserv.pid, 0)
506 except ChildProcessError:
507 pass
508 singleton = None 462 singleton = None
509 463
510def ping(host, port): 464def ping(host, port):
511 conn=PRServerConnection(host, port) 465 from . import client
512 return conn.ping() 466
467 with client.PRClient() as conn:
468 conn.connect_tcp(host, port)
469 return conn.ping()
470
471def connect(host, port):
472 from . import client
473
474 global singleton
475
476 if host.strip().lower() == "localhost" and not port:
477 host = "localhost"
478 port = singleton.port
479
480 conn = client.PRClient()
481 conn.connect_tcp(host, port)
482 return conn