diff options
Diffstat (limited to 'subcmds/sync.py')
| -rw-r--r-- | subcmds/sync.py | 44 |
1 files changed, 32 insertions, 12 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 28568062..fb25c221 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -358,7 +358,7 @@ later is required to fix a server side protocol bug. | |||
| 358 | optimized_fetch=opt.optimized_fetch, | 358 | optimized_fetch=opt.optimized_fetch, |
| 359 | retry_fetches=opt.retry_fetches, | 359 | retry_fetches=opt.retry_fetches, |
| 360 | prune=opt.prune, | 360 | prune=opt.prune, |
| 361 | ssh_proxy=True, | 361 | ssh_proxy=self.ssh_proxy, |
| 362 | clone_filter=self.manifest.CloneFilter, | 362 | clone_filter=self.manifest.CloneFilter, |
| 363 | partial_clone_exclude=self.manifest.PartialCloneExclude) | 363 | partial_clone_exclude=self.manifest.PartialCloneExclude) |
| 364 | 364 | ||
| @@ -380,7 +380,11 @@ later is required to fix a server side protocol bug. | |||
| 380 | finish = time.time() | 380 | finish = time.time() |
| 381 | return (success, project, start, finish) | 381 | return (success, project, start, finish) |
| 382 | 382 | ||
| 383 | def _Fetch(self, projects, opt, err_event): | 383 | @classmethod |
| 384 | def _FetchInitChild(cls, ssh_proxy): | ||
| 385 | cls.ssh_proxy = ssh_proxy | ||
| 386 | |||
| 387 | def _Fetch(self, projects, opt, err_event, ssh_proxy): | ||
| 384 | ret = True | 388 | ret = True |
| 385 | 389 | ||
| 386 | jobs = opt.jobs_network if opt.jobs_network else self.jobs | 390 | jobs = opt.jobs_network if opt.jobs_network else self.jobs |
| @@ -410,8 +414,14 @@ later is required to fix a server side protocol bug. | |||
| 410 | break | 414 | break |
| 411 | return ret | 415 | return ret |
| 412 | 416 | ||
| 417 | # We pass the ssh proxy settings via the class. This allows multiprocessing | ||
| 418 | # to pickle it up when spawning children. We can't pass it as an argument | ||
| 419 | # to _FetchProjectList below as multiprocessing is unable to pickle those. | ||
| 420 | Sync.ssh_proxy = None | ||
| 421 | |||
| 413 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 422 | # NB: Multiprocessing is heavy, so don't spin it up for one job. |
| 414 | if len(projects_list) == 1 or jobs == 1: | 423 | if len(projects_list) == 1 or jobs == 1: |
| 424 | self._FetchInitChild(ssh_proxy) | ||
| 415 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): | 425 | if not _ProcessResults(self._FetchProjectList(opt, x) for x in projects_list): |
| 416 | ret = False | 426 | ret = False |
| 417 | else: | 427 | else: |
| @@ -429,7 +439,8 @@ later is required to fix a server side protocol bug. | |||
| 429 | else: | 439 | else: |
| 430 | pm.update(inc=0, msg='warming up') | 440 | pm.update(inc=0, msg='warming up') |
| 431 | chunksize = 4 | 441 | chunksize = 4 |
| 432 | with multiprocessing.Pool(jobs) as pool: | 442 | with multiprocessing.Pool( |
| 443 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)) as pool: | ||
| 433 | results = pool.imap_unordered( | 444 | results = pool.imap_unordered( |
| 434 | functools.partial(self._FetchProjectList, opt), | 445 | functools.partial(self._FetchProjectList, opt), |
| 435 | projects_list, | 446 | projects_list, |
| @@ -438,6 +449,11 @@ later is required to fix a server side protocol bug. | |||
| 438 | ret = False | 449 | ret = False |
| 439 | pool.close() | 450 | pool.close() |
| 440 | 451 | ||
| 452 | # Cleanup the reference now that we're done with it, and we're going to | ||
| 453 | # release any resources it points to. If we don't, later multiprocessing | ||
| 454 | # usage (e.g. checkouts) will try to pickle and then crash. | ||
| 455 | del Sync.ssh_proxy | ||
| 456 | |||
| 441 | pm.end() | 457 | pm.end() |
| 442 | self._fetch_times.Save() | 458 | self._fetch_times.Save() |
| 443 | 459 | ||
| @@ -447,7 +463,7 @@ later is required to fix a server side protocol bug. | |||
| 447 | return (ret, fetched) | 463 | return (ret, fetched) |
| 448 | 464 | ||
| 449 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, | 465 | def _FetchMain(self, opt, args, all_projects, err_event, manifest_name, |
| 450 | load_local_manifests): | 466 | load_local_manifests, ssh_proxy): |
| 451 | """The main network fetch loop. | 467 | """The main network fetch loop. |
| 452 | 468 | ||
| 453 | Args: | 469 | Args: |
| @@ -457,6 +473,7 @@ later is required to fix a server side protocol bug. | |||
| 457 | err_event: Whether an error was hit while processing. | 473 | err_event: Whether an error was hit while processing. |
| 458 | manifest_name: Manifest file to be reloaded. | 474 | manifest_name: Manifest file to be reloaded. |
| 459 | load_local_manifests: Whether to load local manifests. | 475 | load_local_manifests: Whether to load local manifests. |
| 476 | ssh_proxy: SSH manager for clients & masters. | ||
| 460 | """ | 477 | """ |
| 461 | rp = self.manifest.repoProject | 478 | rp = self.manifest.repoProject |
| 462 | 479 | ||
| @@ -467,7 +484,7 @@ later is required to fix a server side protocol bug. | |||
| 467 | to_fetch.extend(all_projects) | 484 | to_fetch.extend(all_projects) |
| 468 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 485 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) |
| 469 | 486 | ||
| 470 | success, fetched = self._Fetch(to_fetch, opt, err_event) | 487 | success, fetched = self._Fetch(to_fetch, opt, err_event, ssh_proxy) |
| 471 | if not success: | 488 | if not success: |
| 472 | err_event.set() | 489 | err_event.set() |
| 473 | 490 | ||
| @@ -498,7 +515,7 @@ later is required to fix a server side protocol bug. | |||
| 498 | if previously_missing_set == missing_set: | 515 | if previously_missing_set == missing_set: |
| 499 | break | 516 | break |
| 500 | previously_missing_set = missing_set | 517 | previously_missing_set = missing_set |
| 501 | success, new_fetched = self._Fetch(missing, opt, err_event) | 518 | success, new_fetched = self._Fetch(missing, opt, err_event, ssh_proxy) |
| 502 | if not success: | 519 | if not success: |
| 503 | err_event.set() | 520 | err_event.set() |
| 504 | fetched.update(new_fetched) | 521 | fetched.update(new_fetched) |
| @@ -985,12 +1002,15 @@ later is required to fix a server side protocol bug. | |||
| 985 | 1002 | ||
| 986 | self._fetch_times = _FetchTimes(self.manifest) | 1003 | self._fetch_times = _FetchTimes(self.manifest) |
| 987 | if not opt.local_only: | 1004 | if not opt.local_only: |
| 988 | try: | 1005 | with multiprocessing.Manager() as manager: |
| 989 | ssh.init() | 1006 | with ssh.ProxyManager(manager) as ssh_proxy: |
| 990 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, | 1007 | # Initialize the socket dir once in the parent. |
| 991 | load_local_manifests) | 1008 | ssh_proxy.sock() |
| 992 | finally: | 1009 | self._FetchMain(opt, args, all_projects, err_event, manifest_name, |
| 993 | ssh.close() | 1010 | load_local_manifests, ssh_proxy) |
| 1011 | |||
| 1012 | if opt.network_only: | ||
| 1013 | return | ||
| 994 | 1014 | ||
| 995 | # If we saw an error, exit with code 1 so that other scripts can check. | 1015 | # If we saw an error, exit with code 1 so that other scripts can check. |
| 996 | if err_event.is_set(): | 1016 | if err_event.is_set(): |
