diff options
Diffstat (limited to 'subcmds/forall.py')
| -rw-r--r-- | subcmds/forall.py | 90 | 
1 files changed, 48 insertions, 42 deletions
| diff --git a/subcmds/forall.py b/subcmds/forall.py index 287f2e04..e5fc9e80 100644 --- a/subcmds/forall.py +++ b/subcmds/forall.py | |||
| @@ -15,7 +15,6 @@ | |||
| 15 | import errno | 15 | import errno | 
| 16 | import functools | 16 | import functools | 
| 17 | import io | 17 | import io | 
| 18 | import multiprocessing | ||
| 19 | import os | 18 | import os | 
| 20 | import re | 19 | import re | 
| 21 | import signal | 20 | import signal | 
| @@ -26,7 +25,6 @@ from color import Coloring | |||
| 26 | from command import Command | 25 | from command import Command | 
| 27 | from command import DEFAULT_LOCAL_JOBS | 26 | from command import DEFAULT_LOCAL_JOBS | 
| 28 | from command import MirrorSafeCommand | 27 | from command import MirrorSafeCommand | 
| 29 | from command import WORKER_BATCH_SIZE | ||
| 30 | from error import ManifestInvalidRevisionError | 28 | from error import ManifestInvalidRevisionError | 
| 31 | from repo_logging import RepoLogger | 29 | from repo_logging import RepoLogger | 
| 32 | 30 | ||
| @@ -241,7 +239,6 @@ without iterating through the remaining projects. | |||
| 241 | cmd.insert(cmd.index(cn) + 1, "--color") | 239 | cmd.insert(cmd.index(cn) + 1, "--color") | 
| 242 | 240 | ||
| 243 | mirror = self.manifest.IsMirror | 241 | mirror = self.manifest.IsMirror | 
| 244 | rc = 0 | ||
| 245 | 242 | ||
| 246 | smart_sync_manifest_name = "smart_sync_override.xml" | 243 | smart_sync_manifest_name = "smart_sync_override.xml" | 
| 247 | smart_sync_manifest_path = os.path.join( | 244 | smart_sync_manifest_path = os.path.join( | 
| @@ -264,32 +261,41 @@ without iterating through the remaining projects. | |||
| 264 | 261 | ||
| 265 | os.environ["REPO_COUNT"] = str(len(projects)) | 262 | os.environ["REPO_COUNT"] = str(len(projects)) | 
| 266 | 263 | ||
| 264 | def _ProcessResults(_pool, _output, results): | ||
| 265 | rc = 0 | ||
| 266 | first = True | ||
| 267 | for r, output in results: | ||
| 268 | if output: | ||
| 269 | if first: | ||
| 270 | first = False | ||
| 271 | elif opt.project_header: | ||
| 272 | print() | ||
| 273 | # To simplify the DoWorkWrapper, take care of automatic | ||
| 274 | # newlines. | ||
| 275 | end = "\n" | ||
| 276 | if output[-1] == "\n": | ||
| 277 | end = "" | ||
| 278 | print(output, end=end) | ||
| 279 | rc = rc or r | ||
| 280 | if r != 0 and opt.abort_on_errors: | ||
| 281 | raise Exception("Aborting due to previous error") | ||
| 282 | return rc | ||
| 283 | |||
| 267 | try: | 284 | try: | 
| 268 | config = self.manifest.manifestProject.config | 285 | config = self.manifest.manifestProject.config | 
| 269 | with multiprocessing.Pool(opt.jobs, InitWorker) as pool: | 286 | with self.ParallelContext(): | 
| 270 | results_it = pool.imap( | 287 | self.get_parallel_context()["projects"] = projects | 
| 288 | rc = self.ExecuteInParallel( | ||
| 289 | opt.jobs, | ||
| 271 | functools.partial( | 290 | functools.partial( | 
| 272 | DoWorkWrapper, mirror, opt, cmd, shell, config | 291 | self.DoWorkWrapper, mirror, opt, cmd, shell, config | 
| 273 | ), | 292 | ), | 
| 274 | enumerate(projects), | 293 | range(len(projects)), | 
| 275 | chunksize=WORKER_BATCH_SIZE, | 294 | callback=_ProcessResults, | 
| 295 | ordered=True, | ||
| 296 | initializer=self.InitWorker, | ||
| 297 | chunksize=1, | ||
| 276 | ) | 298 | ) | 
| 277 | first = True | ||
| 278 | for r, output in results_it: | ||
| 279 | if output: | ||
| 280 | if first: | ||
| 281 | first = False | ||
| 282 | elif opt.project_header: | ||
| 283 | print() | ||
| 284 | # To simplify the DoWorkWrapper, take care of automatic | ||
| 285 | # newlines. | ||
| 286 | end = "\n" | ||
| 287 | if output[-1] == "\n": | ||
| 288 | end = "" | ||
| 289 | print(output, end=end) | ||
| 290 | rc = rc or r | ||
| 291 | if r != 0 and opt.abort_on_errors: | ||
| 292 | raise Exception("Aborting due to previous error") | ||
| 293 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): | 299 | except (KeyboardInterrupt, WorkerKeyboardInterrupt): | 
| 294 | # Catch KeyboardInterrupt raised inside and outside of workers | 300 | # Catch KeyboardInterrupt raised inside and outside of workers | 
| 295 | rc = rc or errno.EINTR | 301 | rc = rc or errno.EINTR | 
| @@ -304,29 +310,29 @@ without iterating through the remaining projects. | |||
| 304 | if rc != 0: | 310 | if rc != 0: | 
| 305 | sys.exit(rc) | 311 | sys.exit(rc) | 
| 306 | 312 | ||
| 313 | @classmethod | ||
| 314 | def InitWorker(cls): | ||
| 315 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
| 307 | 316 | ||
| 308 | class WorkerKeyboardInterrupt(Exception): | 317 | @classmethod | 
| 309 | """Keyboard interrupt exception for worker processes.""" | 318 | def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx): | 
| 310 | 319 | """A wrapper around the DoWork() method. | |
| 311 | |||
| 312 | def InitWorker(): | ||
| 313 | signal.signal(signal.SIGINT, signal.SIG_IGN) | ||
| 314 | 320 | ||
| 321 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
| 322 | different, ``Exception``-based exception to stop it flooding the console | ||
| 323 | with stacktraces and making the parent hang indefinitely. | ||
| 315 | 324 | ||
| 316 | def DoWorkWrapper(mirror, opt, cmd, shell, config, args): | 325 | """ | 
| 317 | """A wrapper around the DoWork() method. | 326 | project = cls.get_parallel_context()["projects"][project_idx] | 
| 327 | try: | ||
| 328 | return DoWork(project, mirror, opt, cmd, shell, project_idx, config) | ||
| 329 | except KeyboardInterrupt: | ||
| 330 | print("%s: Worker interrupted" % project.name) | ||
| 331 | raise WorkerKeyboardInterrupt() | ||
| 318 | 332 | ||
| 319 | Catch the KeyboardInterrupt exceptions here and re-raise them as a | ||
| 320 | different, ``Exception``-based exception to stop it flooding the console | ||
| 321 | with stacktraces and making the parent hang indefinitely. | ||
| 322 | 333 | ||
| 323 | """ | 334 | class WorkerKeyboardInterrupt(Exception): | 
| 324 | cnt, project = args | 335 | """Keyboard interrupt exception for worker processes.""" | 
| 325 | try: | ||
| 326 | return DoWork(project, mirror, opt, cmd, shell, cnt, config) | ||
| 327 | except KeyboardInterrupt: | ||
| 328 | print("%s: Worker interrupted" % project.name) | ||
| 329 | raise WorkerKeyboardInterrupt() | ||
| 330 | 336 | ||
| 331 | 337 | ||
| 332 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | 338 | def DoWork(project, mirror, opt, cmd, shell, cnt, config): | 
