1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
#
# Copyright BitBake Contributors
#
# SPDX-License-Identifier: GPL-2.0-only
#
import logging
import bb.asyncrpc
logger = logging.getLogger("BitBake.PRserv")
class PRAsyncClient(bb.asyncrpc.AsyncClient):
def __init__(self):
super().__init__('PRSERVICE', '1.0', logger)
async def getPR(self, version, pkgarch, checksum):
response = await self.send_message(
{'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
)
if response:
return response['value']
async def importone(self, version, pkgarch, checksum, value):
response = await self.send_message(
{'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
)
if response:
return response['value']
async def export(self, version, pkgarch, checksum, colinfo):
response = await self.send_message(
{'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
)
if response:
return (response['metainfo'], response['datainfo'])
async def is_readonly(self):
response = await self.send_message(
{'is-readonly': {}}
)
if response:
return response['readonly']
class PRClient(bb.asyncrpc.Client):
def __init__(self):
super().__init__()
self._add_methods('getPR', 'importone', 'export', 'is_readonly')
def _get_async_client(self):
return PRAsyncClient()
|