summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/prserv
diff options
context:
space:
mode:
authorTudor Florea <tudor.florea@enea.com>2015-10-09 22:59:03 +0200
committerTudor Florea <tudor.florea@enea.com>2015-10-09 22:59:03 +0200
commit972dcfcdbfe75dcfeb777150c136576cf1a71e99 (patch)
tree97a61cd7e293d7ae9d56ef7ed0f81253365bb026 /bitbake/lib/prserv
downloadpoky-972dcfcdbfe75dcfeb777150c136576cf1a71e99.tar.gz
initial commit for Enea Linux 5.0 arm
Signed-off-by: Tudor Florea <tudor.florea@enea.com>
Diffstat (limited to 'bitbake/lib/prserv')
-rw-r--r--bitbake/lib/prserv/__init__.py14
-rw-r--r--bitbake/lib/prserv/db.py258
-rw-r--r--bitbake/lib/prserv/serv.py387
3 files changed, 659 insertions, 0 deletions
diff --git a/bitbake/lib/prserv/__init__.py b/bitbake/lib/prserv/__init__.py
new file mode 100644
index 0000000000..c3cb73ad92
--- /dev/null
+++ b/bitbake/lib/prserv/__init__.py
@@ -0,0 +1,14 @@
1__version__ = "1.0.0"
2
3import os, time
4import sys,logging
5
6def init_logger(logfile, loglevel):
7 numeric_level = getattr(logging, loglevel.upper(), None)
8 if not isinstance(numeric_level, int):
9 raise ValueError('Invalid log level: %s' % loglevel)
10 FORMAT = '%(asctime)-15s %(message)s'
11 logging.basicConfig(level=numeric_level, filename=logfile, format=FORMAT)
12
13class NotFoundError(Exception):
14 pass
diff --git a/bitbake/lib/prserv/db.py b/bitbake/lib/prserv/db.py
new file mode 100644
index 0000000000..9d6d11526a
--- /dev/null
+++ b/bitbake/lib/prserv/db.py
@@ -0,0 +1,258 @@
1import logging
2import os.path
3import errno
4import prserv
5import time
6
7try:
8 import sqlite3
9except ImportError:
10 from pysqlite2 import dbapi2 as sqlite3
11
12logger = logging.getLogger("BitBake.PRserv")
13
14sqlversion = sqlite3.sqlite_version_info
15if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3):
16 raise Exception("sqlite3 version 3.3.0 or later is required.")
17
18class PRTable(object):
19 def __init__(self, conn, table, nohist):
20 self.conn = conn
21 self.nohist = nohist
22 self.dirty = False
23 if nohist:
24 self.table = "%s_nohist" % table
25 else:
26 self.table = "%s_hist" % table
27
28 self._execute("CREATE TABLE IF NOT EXISTS %s \
29 (version TEXT NOT NULL, \
30 pkgarch TEXT NOT NULL, \
31 checksum TEXT NOT NULL, \
32 value INTEGER, \
33 PRIMARY KEY (version, pkgarch, checksum));" % self.table)
34
35 def _execute(self, *query):
36 """Execute a query, waiting to acquire a lock if necessary"""
37 start = time.time()
38 end = start + 20
39 while True:
40 try:
41 return self.conn.execute(*query)
42 except sqlite3.OperationalError as exc:
43 if 'is locked' in str(exc) and end > time.time():
44 continue
45 raise exc
46
47 def sync(self):
48 self.conn.commit()
49 self._execute("BEGIN EXCLUSIVE TRANSACTION")
50
51 def sync_if_dirty(self):
52 if self.dirty:
53 self.sync()
54 self.dirty = False
55
56 def _getValueHist(self, version, pkgarch, checksum):
57 data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
58 (version, pkgarch, checksum))
59 row=data.fetchone()
60 if row != None:
61 return row[0]
62 else:
63 #no value found, try to insert
64 try:
65 self._execute("INSERT INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
66 % (self.table,self.table),
67 (version,pkgarch, checksum,version, pkgarch))
68 except sqlite3.IntegrityError as exc:
69 logger.error(str(exc))
70
71 self.dirty = True
72
73 data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
74 (version, pkgarch, checksum))
75 row=data.fetchone()
76 if row != None:
77 return row[0]
78 else:
79 raise prserv.NotFoundError
80
81 def _getValueNohist(self, version, pkgarch, checksum):
82 data=self._execute("SELECT value FROM %s \
83 WHERE version=? AND pkgarch=? AND checksum=? AND \
84 value >= (select max(value) from %s where version=? AND pkgarch=?);"
85 % (self.table, self.table),
86 (version, pkgarch, checksum, version, pkgarch))
87 row=data.fetchone()
88 if row != None:
89 return row[0]
90 else:
91 #no value found, try to insert
92 try:
93 self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));"
94 % (self.table,self.table),
95 (version, pkgarch, checksum, version, pkgarch))
96 except sqlite3.IntegrityError as exc:
97 logger.error(str(exc))
98 self.conn.rollback()
99
100 self.dirty = True
101
102 data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
103 (version, pkgarch, checksum))
104 row=data.fetchone()
105 if row != None:
106 return row[0]
107 else:
108 raise prserv.NotFoundError
109
110 def getValue(self, version, pkgarch, checksum):
111 if self.nohist:
112 return self._getValueNohist(version, pkgarch, checksum)
113 else:
114 return self._getValueHist(version, pkgarch, checksum)
115
116 def _importHist(self, version, pkgarch, checksum, value):
117 val = None
118 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
119 (version, pkgarch, checksum))
120 row = data.fetchone()
121 if row != None:
122 val=row[0]
123 else:
124 #no value found, try to insert
125 try:
126 self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
127 (version, pkgarch, checksum, value))
128 except sqlite3.IntegrityError as exc:
129 logger.error(str(exc))
130
131 self.dirty = True
132
133 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
134 (version, pkgarch, checksum))
135 row = data.fetchone()
136 if row != None:
137 val = row[0]
138 return val
139
140 def _importNohist(self, version, pkgarch, checksum, value):
141 try:
142 #try to insert
143 self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
144 (version, pkgarch, checksum,value))
145 except sqlite3.IntegrityError as exc:
146 #already have the record, try to update
147 try:
148 self._execute("UPDATE %s SET value=? WHERE version=? AND pkgarch=? AND checksum=? AND value<?"
149 % (self.table),
150 (value,version,pkgarch,checksum,value))
151 except sqlite3.IntegrityError as exc:
152 logger.error(str(exc))
153
154 self.dirty = True
155
156 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=? AND value>=?;" % self.table,
157 (version,pkgarch,checksum,value))
158 row=data.fetchone()
159 if row != None:
160 return row[0]
161 else:
162 return None
163
164 def importone(self, version, pkgarch, checksum, value):
165 if self.nohist:
166 return self._importNohist(version, pkgarch, checksum, value)
167 else:
168 return self._importHist(version, pkgarch, checksum, value)
169
170 def export(self, version, pkgarch, checksum, colinfo):
171 metainfo = {}
172 #column info
173 if colinfo:
174 metainfo['tbl_name'] = self.table
175 metainfo['core_ver'] = prserv.__version__
176 metainfo['col_info'] = []
177 data = self._execute("PRAGMA table_info(%s);" % self.table)
178 for row in data:
179 col = {}
180 col['name'] = row['name']
181 col['type'] = row['type']
182 col['notnull'] = row['notnull']
183 col['dflt_value'] = row['dflt_value']
184 col['pk'] = row['pk']
185 metainfo['col_info'].append(col)
186
187 #data info
188 datainfo = []
189
190 if self.nohist:
191 sqlstmt = "SELECT T1.version, T1.pkgarch, T1.checksum, T1.value FROM %s as T1, \
192 (SELECT version,pkgarch,max(value) as maxvalue FROM %s GROUP BY version,pkgarch) as T2 \
193 WHERE T1.version=T2.version AND T1.pkgarch=T2.pkgarch AND T1.value=T2.maxvalue " % (self.table, self.table)
194 else:
195 sqlstmt = "SELECT * FROM %s as T1 WHERE 1=1 " % self.table
196 sqlarg = []
197 where = ""
198 if version:
199 where += "AND T1.version=? "
200 sqlarg.append(str(version))
201 if pkgarch:
202 where += "AND T1.pkgarch=? "
203 sqlarg.append(str(pkgarch))
204 if checksum:
205 where += "AND T1.checksum=? "
206 sqlarg.append(str(checksum))
207
208 sqlstmt += where + ";"
209
210 if len(sqlarg):
211 data = self._execute(sqlstmt, tuple(sqlarg))
212 else:
213 data = self._execute(sqlstmt)
214 for row in data:
215 if row['version']:
216 col = {}
217 col['version'] = row['version']
218 col['pkgarch'] = row['pkgarch']
219 col['checksum'] = row['checksum']
220 col['value'] = row['value']
221 datainfo.append(col)
222 return (metainfo, datainfo)
223
224class PRData(object):
225 """Object representing the PR database"""
226 def __init__(self, filename, nohist=True):
227 self.filename=os.path.abspath(filename)
228 self.nohist=nohist
229 #build directory hierarchy
230 try:
231 os.makedirs(os.path.dirname(self.filename))
232 except OSError as e:
233 if e.errno != errno.EEXIST:
234 raise e
235 self.connection=sqlite3.connect(self.filename, isolation_level="EXCLUSIVE", check_same_thread = False)
236 self.connection.row_factory=sqlite3.Row
237 self.connection.execute("pragma synchronous = off;")
238 self.connection.execute("PRAGMA journal_mode = WAL;")
239 self._tables={}
240
241 def __del__(self):
242 self.connection.close()
243
244 def __getitem__(self,tblname):
245 if not isinstance(tblname, basestring):
246 raise TypeError("tblname argument must be a string, not '%s'" %
247 type(tblname))
248 if tblname in self._tables:
249 return self._tables[tblname]
250 else:
251 tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist)
252 return tableobj
253
254 def __delitem__(self, tblname):
255 if tblname in self._tables:
256 del self._tables[tblname]
257 logger.info("drop table %s" % (tblname))
258 self.connection.execute("DROP TABLE IF EXISTS %s;" % tblname)
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
new file mode 100644
index 0000000000..25eb46a410
--- /dev/null
+++ b/bitbake/lib/prserv/serv.py
@@ -0,0 +1,387 @@
1import os,sys,logging
2import signal, time, atexit, threading
3from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
4import xmlrpclib
5import threading
6import Queue
7
8try:
9 import sqlite3
10except ImportError:
11 from pysqlite2 import dbapi2 as sqlite3
12
13import bb.server.xmlrpc
14import prserv
15import prserv.db
16import errno
17
18logger = logging.getLogger("BitBake.PRserv")
19
20if sys.hexversion < 0x020600F0:
21 print("Sorry, python 2.6 or later is required.")
22 sys.exit(1)
23
24class Handler(SimpleXMLRPCRequestHandler):
25 def _dispatch(self,method,params):
26 try:
27 value=self.server.funcs[method](*params)
28 except:
29 import traceback
30 traceback.print_exc()
31 raise
32 return value
33
34PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
35singleton = None
36
37
38class PRServer(SimpleXMLRPCServer):
39 def __init__(self, dbfile, logfile, interface, daemon=True):
40 ''' constructor '''
41 import socket
42 try:
43 SimpleXMLRPCServer.__init__(self, interface,
44 logRequests=False, allow_none=True)
45 except socket.error:
46 ip=socket.gethostbyname(interface[0])
47 port=interface[1]
48 msg="PR Server unable to bind to %s:%s\n" % (ip, port)
49 sys.stderr.write(msg)
50 raise PRServiceConfigError
51
52 self.dbfile=dbfile
53 self.daemon=daemon
54 self.logfile=logfile
55 self.working_thread=None
56 self.host, self.port = self.socket.getsockname()
57 self.pidfile=PIDPREFIX % (self.host, self.port)
58
59 self.register_function(self.getPR, "getPR")
60 self.register_function(self.quit, "quit")
61 self.register_function(self.ping, "ping")
62 self.register_function(self.export, "export")
63 self.register_function(self.importone, "importone")
64 self.register_introspection_functions()
65
66 self.db = prserv.db.PRData(self.dbfile)
67 self.table = self.db["PRMAIN"]
68
69 self.requestqueue = Queue.Queue()
70 self.handlerthread = threading.Thread(target = self.process_request_thread)
71 self.handlerthread.daemon = False
72
73 def process_request_thread(self):
74 """Same as in BaseServer but as a thread.
75
76 In addition, exception handling is done here.
77
78 """
79 iter_count = 1
80 # With 60 iterations between syncs and a 0.5 second timeout between
81 # iterations, this will sync if dirty every ~30 seconds.
82 iterations_between_sync = 60
83
84 while True:
85 (request, client_address) = self.requestqueue.get()
86 try:
87 self.finish_request(request, client_address)
88 self.shutdown_request(request)
89 iter_count = (iter_count + 1) % iterations_between_sync
90 if iter_count == 0:
91 self.table.sync_if_dirty()
92 except:
93 self.handle_error(request, client_address)
94 self.shutdown_request(request)
95 self.table.sync()
96
97 def process_request(self, request, client_address):
98 self.requestqueue.put((request, client_address))
99
100 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
101 try:
102 return self.table.export(version, pkgarch, checksum, colinfo)
103 except sqlite3.Error as exc:
104 logger.error(str(exc))
105 return None
106
107 def importone(self, version, pkgarch, checksum, value):
108 return self.table.importone(version, pkgarch, checksum, value)
109
110 def ping(self):
111 return not self.quit
112
113 def getinfo(self):
114 return (self.host, self.port)
115
116 def getPR(self, version, pkgarch, checksum):
117 try:
118 return self.table.getValue(version, pkgarch, checksum)
119 except prserv.NotFoundError:
120 logger.error("can not find value for (%s, %s)",version, checksum)
121 return None
122 except sqlite3.Error as exc:
123 logger.error(str(exc))
124 return None
125
126 def quit(self):
127 self.quit=True
128 return
129
130 def work_forever(self,):
131 self.quit = False
132 self.timeout = 0.5
133
134 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
135 (self.dbfile, self.host, self.port, str(os.getpid())))
136
137 self.handlerthread.start()
138 while not self.quit:
139 self.handle_request()
140
141 self.table.sync()
142 logger.info("PRServer: stopping...")
143 self.server_close()
144 return
145
146 def start(self):
147 pid = self.daemonize()
148 # Ensure both the parent sees this and the child from the work_forever log entry above
149 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
150 (self.dbfile, self.host, self.port, str(pid)))
151
152 def delpid(self):
153 os.remove(self.pidfile)
154
155 def daemonize(self):
156 """
157 See Advanced Programming in the UNIX, Sec 13.3
158 """
159 try:
160 pid = os.fork()
161 if pid > 0:
162 os.waitpid(pid, 0)
163 #parent return instead of exit to give control
164 return pid
165 except OSError as e:
166 raise Exception("%s [%d]" % (e.strerror, e.errno))
167
168 os.setsid()
169 """
170 fork again to make sure the daemon is not session leader,
171 which prevents it from acquiring controlling terminal
172 """
173 try:
174 pid = os.fork()
175 if pid > 0: #parent
176 os._exit(0)
177 except OSError as e:
178 raise Exception("%s [%d]" % (e.strerror, e.errno))
179
180 os.umask(0)
181 os.chdir("/")
182
183 sys.stdout.flush()
184 sys.stderr.flush()
185 si = file('/dev/null', 'r')
186 so = file(self.logfile, 'a+')
187 se = so
188 os.dup2(si.fileno(),sys.stdin.fileno())
189 os.dup2(so.fileno(),sys.stdout.fileno())
190 os.dup2(se.fileno(),sys.stderr.fileno())
191
192 # Clear out all log handlers prior to the fork() to avoid calling
193 # event handlers not part of the PRserver
194 for logger_iter in logging.Logger.manager.loggerDict.keys():
195 logging.getLogger(logger_iter).handlers = []
196
197 # Ensure logging makes it to the logfile
198 streamhandler = logging.StreamHandler()
199 streamhandler.setLevel(logging.DEBUG)
200 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
201 streamhandler.setFormatter(formatter)
202 logger.addHandler(streamhandler)
203
204 # write pidfile
205 pid = str(os.getpid())
206 pf = file(self.pidfile, 'w')
207 pf.write("%s\n" % pid)
208 pf.close()
209
210 self.work_forever()
211 self.delpid()
212 os._exit(0)
213
214class PRServSingleton(object):
215 def __init__(self, dbfile, logfile, interface):
216 self.dbfile = dbfile
217 self.logfile = logfile
218 self.interface = interface
219 self.host = None
220 self.port = None
221
222 def start(self):
223 self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
224 self.prserv.start()
225 self.host, self.port = self.prserv.getinfo()
226
227 def getinfo(self):
228 return (self.host, self.port)
229
230class PRServerConnection(object):
231 def __init__(self, host, port):
232 if is_local_special(host, port):
233 host, port = singleton.getinfo()
234 self.host = host
235 self.port = port
236 self.connection, self.transport = bb.server.xmlrpc._create_server(self.host, self.port)
237
238 def terminate(self):
239 try:
240 logger.info("Terminating PRServer...")
241 self.connection.quit()
242 except Exception as exc:
243 sys.stderr.write("%s\n" % str(exc))
244
245 def getPR(self, version, pkgarch, checksum):
246 return self.connection.getPR(version, pkgarch, checksum)
247
248 def ping(self):
249 return self.connection.ping()
250
251 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
252 return self.connection.export(version, pkgarch, checksum, colinfo)
253
254 def importone(self, version, pkgarch, checksum, value):
255 return self.connection.importone(version, pkgarch, checksum, value)
256
257 def getinfo(self):
258 return self.host, self.port
259
260def start_daemon(dbfile, host, port, logfile):
261 pidfile = PIDPREFIX % (host, port)
262 try:
263 pf = file(pidfile,'r')
264 pid = int(pf.readline().strip())
265 pf.close()
266 except IOError:
267 pid = None
268
269 if pid:
270 sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
271 % pidfile)
272 return 1
273
274 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (host,port))
275 server.start()
276 return 0
277
278def stop_daemon(host, port):
279 pidfile = PIDPREFIX % (host, port)
280 try:
281 pf = file(pidfile,'r')
282 pid = int(pf.readline().strip())
283 pf.close()
284 except IOError:
285 pid = None
286
287 if not pid:
288 sys.stderr.write("pidfile %s does not exist. Daemon not running?\n"
289 % pidfile)
290
291 try:
292 PRServerConnection(host, port).terminate()
293 except:
294 logger.critical("Stop PRService %s:%d failed" % (host,port))
295
296 try:
297 if pid:
298 wait_timeout = 0
299 print("Waiting for pr-server to exit.")
300 while is_running(pid) and wait_timeout < 50:
301 time.sleep(0.1)
302 wait_timeout += 1
303
304 if is_running(pid):
305 print("Sending SIGTERM to pr-server.")
306 os.kill(pid,signal.SIGTERM)
307 time.sleep(0.1)
308
309 if os.path.exists(pidfile):
310 os.remove(pidfile)
311
312 except OSError as e:
313 err = str(e)
314 if err.find("No such process") <= 0:
315 raise e
316
317 return 0
318
319def is_running(pid):
320 try:
321 os.kill(pid, 0)
322 except OSError as err:
323 if err.errno == errno.ESRCH:
324 return False
325 return True
326
327def is_local_special(host, port):
328 if host.strip().upper() == 'localhost'.upper() and (not port):
329 return True
330 else:
331 return False
332
333class PRServiceConfigError(Exception):
334 pass
335
336def auto_start(d):
337 global singleton
338
339 host_params = filter(None, (d.getVar('PRSERV_HOST', True) or '').split(':'))
340 if not host_params:
341 return None
342
343 if len(host_params) != 2:
344 logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
345 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
346 raise PRServiceConfigError
347
348 if is_local_special(host_params[0], int(host_params[1])) and not singleton:
349 import bb.utils
350 cachedir = (d.getVar("PERSISTENT_DIR", True) or d.getVar("CACHE", True))
351 if not cachedir:
352 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
353 raise PRServiceConfigError
354 bb.utils.mkdirhier(cachedir)
355 dbfile = os.path.join(cachedir, "prserv.sqlite3")
356 logfile = os.path.join(cachedir, "prserv.log")
357 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
358 singleton.start()
359 if singleton:
360 host, port = singleton.getinfo()
361 else:
362 host = host_params[0]
363 port = int(host_params[1])
364
365 try:
366 connection = PRServerConnection(host,port)
367 connection.ping()
368 realhost, realport = connection.getinfo()
369 return str(realhost) + ":" + str(realport)
370
371 except Exception:
372 logger.critical("PRservice %s:%d not available" % (host, port))
373 raise PRServiceConfigError
374
375def auto_shutdown(d=None):
376 global singleton
377 if singleton:
378 host, port = singleton.getinfo()
379 try:
380 PRServerConnection(host, port).terminate()
381 except:
382 logger.critical("Stop PRService %s:%d failed" % (host,port))
383 singleton = None
384
385def ping(host, port):
386 conn=PRServerConnection(host, port)
387 return conn.ping()