diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2021-08-19 12:46:41 -0400 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2021-08-23 08:30:54 +0100 |
commit | fdc908f321da852856a07c19e3d9a33ccb273dda (patch) | |
tree | a26f6e925cb1a03be33c9817ac88abe742b43564 /bitbake/lib | |
parent | e8182a794da04212e8815c1cb6d01f4f7744424f (diff) | |
download | poky-fdc908f321da852856a07c19e3d9a33ccb273dda.tar.gz |
bitbake: bitbake: asyncrpc: Defer all asyncio to child process
Reworks the async I/O API so that the async loop is only created in the
child process. This requires deferring the creation of the server until
the child process and a queue to transfer the bound address back to the
parent process
(Bitbake rev: 8555869cde39f9e9a9ced5a3e5788209640f6d50)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
[small loop -> self.loop fix in serv.py]
Signed-off-by: Scott Murray <scott.murray@konsulko.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 118 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 4 |
2 files changed, 74 insertions, 48 deletions
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index 4084f300df..45628698b6 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
@@ -131,53 +131,58 @@ class AsyncServerConnection(object): | |||
131 | 131 | ||
132 | 132 | ||
133 | class AsyncServer(object): | 133 | class AsyncServer(object): |
134 | def __init__(self, logger, loop=None): | 134 | def __init__(self, logger): |
135 | if loop is None: | ||
136 | self.loop = asyncio.new_event_loop() | ||
137 | self.close_loop = True | ||
138 | else: | ||
139 | self.loop = loop | ||
140 | self.close_loop = False | ||
141 | |||
142 | self._cleanup_socket = None | 135 | self._cleanup_socket = None |
143 | self.logger = logger | 136 | self.logger = logger |
137 | self.start = None | ||
138 | self.address = None | ||
139 | |||
140 | @property | ||
141 | def loop(self): | ||
142 | return asyncio.get_event_loop() | ||
144 | 143 | ||
145 | def start_tcp_server(self, host, port): | 144 | def start_tcp_server(self, host, port): |
146 | self.server = self.loop.run_until_complete( | 145 | def start_tcp(): |
147 | asyncio.start_server(self.handle_client, host, port, loop=self.loop) | 146 | self.server = self.loop.run_until_complete( |
148 | ) | 147 | asyncio.start_server(self.handle_client, host, port) |
149 | 148 | ) | |
150 | for s in self.server.sockets: | 149 | |
151 | self.logger.debug('Listening on %r' % (s.getsockname(),)) | 150 | for s in self.server.sockets: |
152 | # Newer python does this automatically. Do it manually here for | 151 | self.logger.debug('Listening on %r' % (s.getsockname(),)) |
153 | # maximum compatibility | 152 | # Newer python does this automatically. Do it manually here for |
154 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | 153 | # maximum compatibility |
155 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | 154 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) |
156 | 155 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | |
157 | name = self.server.sockets[0].getsockname() | 156 | |
158 | if self.server.sockets[0].family == socket.AF_INET6: | 157 | name = self.server.sockets[0].getsockname() |
159 | self.address = "[%s]:%d" % (name[0], name[1]) | 158 | if self.server.sockets[0].family == socket.AF_INET6: |
160 | else: | 159 | self.address = "[%s]:%d" % (name[0], name[1]) |
161 | self.address = "%s:%d" % (name[0], name[1]) | 160 | else: |
161 | self.address = "%s:%d" % (name[0], name[1]) | ||
162 | |||
163 | self.start = start_tcp | ||
162 | 164 | ||
163 | def start_unix_server(self, path): | 165 | def start_unix_server(self, path): |
164 | def cleanup(): | 166 | def cleanup(): |
165 | os.unlink(path) | 167 | os.unlink(path) |
166 | 168 | ||
167 | cwd = os.getcwd() | 169 | def start_unix(): |
168 | try: | 170 | cwd = os.getcwd() |
169 | # Work around path length limits in AF_UNIX | 171 | try: |
170 | os.chdir(os.path.dirname(path)) | 172 | # Work around path length limits in AF_UNIX |
171 | self.server = self.loop.run_until_complete( | 173 | os.chdir(os.path.dirname(path)) |
172 | asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) | 174 | self.server = self.loop.run_until_complete( |
173 | ) | 175 | asyncio.start_unix_server(self.handle_client, os.path.basename(path)) |
174 | finally: | 176 | ) |
175 | os.chdir(cwd) | 177 | finally: |
178 | os.chdir(cwd) | ||
176 | 179 | ||
177 | self.logger.debug('Listening on %r' % path) | 180 | self.logger.debug('Listening on %r' % path) |
178 | 181 | ||
179 | self._cleanup_socket = cleanup | 182 | self._cleanup_socket = cleanup |
180 | self.address = "unix://%s" % os.path.abspath(path) | 183 | self.address = "unix://%s" % os.path.abspath(path) |
184 | |||
185 | self.start = start_unix | ||
181 | 186 | ||
182 | @abc.abstractmethod | 187 | @abc.abstractmethod |
183 | def accept_client(self, reader, writer): | 188 | def accept_client(self, reader, writer): |
@@ -205,8 +210,7 @@ class AsyncServer(object): | |||
205 | self.logger.debug("Got exit signal") | 210 | self.logger.debug("Got exit signal") |
206 | self.loop.stop() | 211 | self.loop.stop() |
207 | 212 | ||
208 | def serve_forever(self): | 213 | def _serve_forever(self): |
209 | asyncio.set_event_loop(self.loop) | ||
210 | try: | 214 | try: |
211 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) | 215 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) |
212 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) | 216 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) |
@@ -217,28 +221,50 @@ class AsyncServer(object): | |||
217 | self.loop.run_until_complete(self.server.wait_closed()) | 221 | self.loop.run_until_complete(self.server.wait_closed()) |
218 | self.logger.debug('Server shutting down') | 222 | self.logger.debug('Server shutting down') |
219 | finally: | 223 | finally: |
220 | if self.close_loop: | ||
221 | if sys.version_info >= (3, 6): | ||
222 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
223 | self.loop.close() | ||
224 | |||
225 | if self._cleanup_socket is not None: | 224 | if self._cleanup_socket is not None: |
226 | self._cleanup_socket() | 225 | self._cleanup_socket() |
227 | 226 | ||
227 | def serve_forever(self): | ||
228 | """ | ||
229 | Serve requests in the current process | ||
230 | """ | ||
231 | self.start() | ||
232 | self._serve_forever() | ||
233 | |||
228 | def serve_as_process(self, *, prefunc=None, args=()): | 234 | def serve_as_process(self, *, prefunc=None, args=()): |
229 | def run(): | 235 | """ |
236 | Serve requests in a child process | ||
237 | """ | ||
238 | def run(queue): | ||
239 | try: | ||
240 | self.start() | ||
241 | finally: | ||
242 | queue.put(self.address) | ||
243 | queue.close() | ||
244 | |||
230 | if prefunc is not None: | 245 | if prefunc is not None: |
231 | prefunc(self, *args) | 246 | prefunc(self, *args) |
232 | self.serve_forever() | 247 | |
248 | self._serve_forever() | ||
249 | |||
250 | if sys.version_info >= (3, 6): | ||
251 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
252 | self.loop.close() | ||
253 | |||
254 | queue = multiprocessing.Queue() | ||
233 | 255 | ||
234 | # Temporarily block SIGTERM. The server process will inherit this | 256 | # Temporarily block SIGTERM. The server process will inherit this |
235 | # block which will ensure it doesn't receive the SIGTERM until the | 257 | # block which will ensure it doesn't receive the SIGTERM until the |
236 | # handler is ready for it | 258 | # handler is ready for it |
237 | mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM]) | 259 | mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM]) |
238 | try: | 260 | try: |
239 | self.process = multiprocessing.Process(target=run) | 261 | self.process = multiprocessing.Process(target=run, args=(queue,)) |
240 | self.process.start() | 262 | self.process.start() |
241 | 263 | ||
264 | self.address = queue.get() | ||
265 | queue.close() | ||
266 | queue.join_thread() | ||
267 | |||
242 | return self.process | 268 | return self.process |
243 | finally: | 269 | finally: |
244 | signal.pthread_sigmask(signal.SIG_SETMASK, mask) | 270 | signal.pthread_sigmask(signal.SIG_SETMASK, mask) |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 8e84989737..a059e52115 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -410,11 +410,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
410 | 410 | ||
411 | 411 | ||
412 | class Server(bb.asyncrpc.AsyncServer): | 412 | class Server(bb.asyncrpc.AsyncServer): |
413 | def __init__(self, db, loop=None, upstream=None, read_only=False): | 413 | def __init__(self, db, upstream=None, read_only=False): |
414 | if upstream and read_only: | 414 | if upstream and read_only: |
415 | raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") | 415 | raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") |
416 | 416 | ||
417 | super().__init__(logger, loop) | 417 | super().__init__(logger) |
418 | 418 | ||
419 | self.request_stats = Stats() | 419 | self.request_stats = Stats() |
420 | self.db = db | 420 | self.db = db |