summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:19 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commit8f8501ed403dec27acbe780b936bc087fc5006d0 (patch)
tree60e6415075c7c71eacec23ca7dda53e4a324b12e /bitbake/lib/hashserv/client.py
parentf97b686884166dd77d1818e70615027c6ba8c348 (diff)
downloadpoky-8f8501ed403dec27acbe780b936bc087fc5006d0.tar.gz
bitbake: asyncrpc: Abstract sockets
Rewrites the asyncrpc client and server code to make it possible to have other transport backends that are not stream based (e.g. websockets which are message based). The connection handling classes are now shared between both the client and server to make it easier to implement new transport mechanisms (Bitbake rev: 2aaeae53696e4c2f13a169830c3b7089cbad6eca) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r--bitbake/lib/hashserv/client.py38
1 files changed, 19 insertions, 19 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index f676d267fa..5f7d22ab13 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
28 28
29 async def send_stream(self, msg): 29 async def send_stream(self, msg):
30 async def proc(): 30 async def proc():
31 self.writer.write(("%s\n" % msg).encode("utf-8")) 31 await self.socket.send(msg)
32 await self.writer.drain() 32 return await self.socket.recv()
33 l = await self.reader.readline()
34 if not l:
35 raise ConnectionError("Connection closed")
36 return l.decode("utf-8").rstrip()
37 33
38 return await self._send_wrapper(proc) 34 return await self._send_wrapper(proc)
39 35
40 async def _set_mode(self, new_mode): 36 async def _set_mode(self, new_mode):
37 async def stream_to_normal():
38 await self.socket.send("END")
39 return await self.socket.recv()
40
41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
42 r = await self.send_stream("END") 42 r = await self._send_wrapper(stream_to_normal)
43 if r != "ok": 43 if r != "ok":
44 raise ConnectionError("Bad response from server %r" % r) 44 raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r)
45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
46 r = await self.send_message({"get-stream": None}) 46 r = await self.invoke({"get-stream": None})
47 if r != "ok": 47 if r != "ok":
48 raise ConnectionError("Bad response from server %r" % r) 48 raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r)
49 elif new_mode != self.mode: 49 elif new_mode != self.mode:
50 raise Exception( 50 raise Exception(
51 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 51 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
@@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
67 m["method"] = method 67 m["method"] = method
68 m["outhash"] = outhash 68 m["outhash"] = outhash
69 m["unihash"] = unihash 69 m["unihash"] = unihash
70 return await self.send_message({"report": m}) 70 return await self.invoke({"report": m})
71 71
72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
73 await self._set_mode(self.MODE_NORMAL) 73 await self._set_mode(self.MODE_NORMAL)
@@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
75 m["taskhash"] = taskhash 75 m["taskhash"] = taskhash
76 m["method"] = method 76 m["method"] = method
77 m["unihash"] = unihash 77 m["unihash"] = unihash
78 return await self.send_message({"report-equiv": m}) 78 return await self.invoke({"report-equiv": m})
79 79
80 async def get_taskhash(self, method, taskhash, all_properties=False): 80 async def get_taskhash(self, method, taskhash, all_properties=False):
81 await self._set_mode(self.MODE_NORMAL) 81 await self._set_mode(self.MODE_NORMAL)
82 return await self.send_message( 82 return await self.invoke(
83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
84 ) 84 )
85 85
86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
87 await self._set_mode(self.MODE_NORMAL) 87 await self._set_mode(self.MODE_NORMAL)
88 return await self.send_message( 88 return await self.invoke(
89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} 89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}}
90 ) 90 )
91 91
92 async def get_stats(self): 92 async def get_stats(self):
93 await self._set_mode(self.MODE_NORMAL) 93 await self._set_mode(self.MODE_NORMAL)
94 return await self.send_message({"get-stats": None}) 94 return await self.invoke({"get-stats": None})
95 95
96 async def reset_stats(self): 96 async def reset_stats(self):
97 await self._set_mode(self.MODE_NORMAL) 97 await self._set_mode(self.MODE_NORMAL)
98 return await self.send_message({"reset-stats": None}) 98 return await self.invoke({"reset-stats": None})
99 99
100 async def backfill_wait(self): 100 async def backfill_wait(self):
101 await self._set_mode(self.MODE_NORMAL) 101 await self._set_mode(self.MODE_NORMAL)
102 return (await self.send_message({"backfill-wait": None}))["tasks"] 102 return (await self.invoke({"backfill-wait": None}))["tasks"]
103 103
104 async def remove(self, where): 104 async def remove(self, where):
105 await self._set_mode(self.MODE_NORMAL) 105 await self._set_mode(self.MODE_NORMAL)
106 return await self.send_message({"remove": {"where": where}}) 106 return await self.invoke({"remove": {"where": where}})
107 107
108 async def clean_unused(self, max_age): 108 async def clean_unused(self, max_age):
109 await self._set_mode(self.MODE_NORMAL) 109 await self._set_mode(self.MODE_NORMAL)
110 return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) 110 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
111 111
112 112
113class Client(bb.asyncrpc.Client): 113class Client(bb.asyncrpc.Client):