diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-07-04 00:14:02 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-07-15 10:28:12 +0100 |
commit | cf829a5f660392bc83a9175ab4900b170496f349 (patch) | |
tree | 91f6183780a5508a5a1e8074e42c7e44c5276b23 | |
parent | 491c6049e06bf1cb1bb4413ba87ce171a3fcd6b4 (diff) | |
download | poky-cf829a5f660392bc83a9175ab4900b170496f349.tar.gz |
bitbake: runqueue: Merge the queues and execute setscene and normal tasks in parallel
This is the serious functionality change in this runqueue patch series of
changes.
Rather than two phases of execution, the scenequeue setscene phase, followed
by normal task exeuction, this change allows them to execute in parallel
together.
To do this we need to handle marking of tasks as covered/uncovered in a piecemeal
fashion on a task by task basis rather than in a single function.
The code will block normal task exeuction until any setcene task which could
cover that task is executed and its status is known. There is a slight
optimisation which could be possible here at the risk of races but that
doesn't seem worthwhile.
The state engine isn't entirely cleaned up in this commit (see FIXME) and
the setscenewhitelist functionality is broken by it (see following patches)
however its good enough to test with normal workflows.
(Bitbake rev: 58b3f0847cc2d47e76f74d59dcbbf78fe41b118b)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 169 |
1 files changed, 115 insertions, 54 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index c1c4fd1b81..aafb6ffa58 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -142,7 +142,7 @@ class RunQueueScheduler(object): | |||
142 | Return the id of the first task we find that is buildable | 142 | Return the id of the first task we find that is buildable |
143 | """ | 143 | """ |
144 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] | 144 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] |
145 | buildable = self.buildable | 145 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] |
146 | if not buildable: | 146 | if not buildable: |
147 | return None | 147 | return None |
148 | 148 | ||
@@ -1454,25 +1454,18 @@ class RunQueue: | |||
1454 | 1454 | ||
1455 | # If we don't have any setscene functions, skip execution | 1455 | # If we don't have any setscene functions, skip execution |
1456 | if len(self.rqdata.runq_setscene_tids) == 0: | 1456 | if len(self.rqdata.runq_setscene_tids) == 0: |
1457 | self.rqdata.init_progress_reporter.finish() | 1457 | logger.info('No setscene tasks') |
1458 | self.state = runQueueRunInit | 1458 | for tid in self.rqdata.runtaskentries: |
1459 | else: | 1459 | if len(self.rqdata.runtaskentries[tid].depends) == 0: |
1460 | logger.info('Executing SetScene Tasks') | 1460 | self.rqexe.setbuildable(tid) |
1461 | self.state = runQueueSceneRun | 1461 | self.rqexe.tasks_notcovered.add(tid) |
1462 | 1462 | self.rqexe.sqdone = True | |
1463 | if self.state is runQueueSceneRun: | 1463 | logger.info('Executing Tasks') |
1464 | retval = self.rqexe.sq_execute() | ||
1465 | |||
1466 | if self.state is runQueueRunInit: | ||
1467 | if self.cooker.configuration.setsceneonly: | ||
1468 | self.state = runQueueComplete | ||
1469 | |||
1470 | if self.state is runQueueRunInit: | ||
1471 | logger.info("Executing RunQueue Tasks") | ||
1472 | start_runqueue_tasks(self.rqexe) | ||
1473 | self.state = runQueueRunning | 1464 | self.state = runQueueRunning |
1474 | 1465 | ||
1475 | if self.state is runQueueRunning: | 1466 | if self.state is runQueueRunning: |
1467 | retval = self.rqexe.sq_execute() | ||
1468 | # FIXME revtal | ||
1476 | retval = self.rqexe.execute() | 1469 | retval = self.rqexe.execute() |
1477 | 1470 | ||
1478 | if self.state is runQueueCleanUp: | 1471 | if self.state is runQueueCleanUp: |
@@ -1757,6 +1750,8 @@ class RunQueueExecute: | |||
1757 | 1750 | ||
1758 | self.stampcache = {} | 1751 | self.stampcache = {} |
1759 | 1752 | ||
1753 | self.sqdone = False | ||
1754 | |||
1760 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) | 1755 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) |
1761 | self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) | 1756 | self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) |
1762 | 1757 | ||
@@ -1772,12 +1767,12 @@ class RunQueueExecute: | |||
1772 | self.scenequeue_covered = set() | 1767 | self.scenequeue_covered = set() |
1773 | # List of tasks which are covered (including setscene ones) | 1768 | # List of tasks which are covered (including setscene ones) |
1774 | self.tasks_covered = set() | 1769 | self.tasks_covered = set() |
1770 | self.tasks_scenequeue_done = set() | ||
1775 | self.scenequeue_notcovered = set() | 1771 | self.scenequeue_notcovered = set() |
1772 | self.tasks_notcovered = set() | ||
1776 | self.scenequeue_notneeded = set() | 1773 | self.scenequeue_notneeded = set() |
1777 | 1774 | ||
1778 | if len(self.rqdata.runq_setscene_tids) > 0: | 1775 | self.coveredtopocess = set() |
1779 | self.sqdata = SQData() | ||
1780 | build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) | ||
1781 | 1776 | ||
1782 | schedulers = self.get_schedulers() | 1777 | schedulers = self.get_schedulers() |
1783 | for scheduler in schedulers: | 1778 | for scheduler in schedulers: |
@@ -1789,6 +1784,10 @@ class RunQueueExecute: | |||
1789 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % | 1784 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % |
1790 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) | 1785 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) |
1791 | 1786 | ||
1787 | if len(self.rqdata.runq_setscene_tids) > 0: | ||
1788 | self.sqdata = SQData() | ||
1789 | build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) | ||
1790 | |||
1792 | def runqueue_process_waitpid(self, task, status): | 1791 | def runqueue_process_waitpid(self, task, status): |
1793 | 1792 | ||
1794 | # self.build_stamps[pid] may not exist when use shared work directory. | 1793 | # self.build_stamps[pid] may not exist when use shared work directory. |
@@ -1951,6 +1950,9 @@ class RunQueueExecute: | |||
1951 | if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self): | 1950 | if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self): |
1952 | return True | 1951 | return True |
1953 | 1952 | ||
1953 | if self.cooker.configuration.setsceneonly: | ||
1954 | return True | ||
1955 | |||
1954 | self.rq.read_workers() | 1956 | self.rq.read_workers() |
1955 | 1957 | ||
1956 | if self.stats.total == 0: | 1958 | if self.stats.total == 0: |
@@ -2014,7 +2016,7 @@ class RunQueueExecute: | |||
2014 | if self.can_start_task(): | 2016 | if self.can_start_task(): |
2015 | return True | 2017 | return True |
2016 | 2018 | ||
2017 | if self.stats.active > 0: | 2019 | if self.stats.active > 0 or self.sq_stats.active > 0: |
2018 | self.rq.read_workers() | 2020 | self.rq.read_workers() |
2019 | return self.rq.active_fds() | 2021 | return self.rq.active_fds() |
2020 | 2022 | ||
@@ -2026,9 +2028,9 @@ class RunQueueExecute: | |||
2026 | for task in self.rqdata.runtaskentries: | 2028 | for task in self.rqdata.runtaskentries: |
2027 | if task not in self.runq_buildable: | 2029 | if task not in self.runq_buildable: |
2028 | logger.error("Task %s never buildable!", task) | 2030 | logger.error("Task %s never buildable!", task) |
2029 | if task not in self.runq_running: | 2031 | elif task not in self.runq_running: |
2030 | logger.error("Task %s never ran!", task) | 2032 | logger.error("Task %s never ran!", task) |
2031 | if task not in self.runq_complete: | 2033 | elif task not in self.runq_complete: |
2032 | logger.error("Task %s never completed!", task) | 2034 | logger.error("Task %s never completed!", task) |
2033 | self.rq.state = runQueueComplete | 2035 | self.rq.state = runQueueComplete |
2034 | 2036 | ||
@@ -2070,7 +2072,42 @@ class RunQueueExecute: | |||
2070 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | 2072 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) |
2071 | return taskdepdata | 2073 | return taskdepdata |
2072 | 2074 | ||
2073 | def scenequeue_updatecounters(self, task, fail = False): | 2075 | def scenequeue_process_notcovered(self, task): |
2076 | logger.debug(1, 'Not skipping setscene task %s', task) | ||
2077 | if len(self.rqdata.runtaskentries[task].depends) == 0: | ||
2078 | self.setbuildable(task) | ||
2079 | notcovered = set([task]) | ||
2080 | while notcovered: | ||
2081 | new = set() | ||
2082 | for t in notcovered: | ||
2083 | for deptask in self.rqdata.runtaskentries[t].depends: | ||
2084 | if deptask in notcovered or deptask in new or deptask in self.rqdata.runq_setscene_tids or deptask in self.tasks_notcovered: | ||
2085 | continue | ||
2086 | logger.debug(1, 'Task %s depends on non-setscene task %s so not skipping' % (t, deptask)) | ||
2087 | new.add(deptask) | ||
2088 | self.tasks_notcovered.add(deptask) | ||
2089 | if len(self.rqdata.runtaskentries[deptask].depends) == 0: | ||
2090 | self.setbuildable(deptask) | ||
2091 | notcovered = new | ||
2092 | |||
2093 | def scenequeue_process_unskippable(self, task): | ||
2094 | # Look up the dependency chain for non-setscene things which depend on this task | ||
2095 | # and mark as 'done'/notcovered | ||
2096 | ready = set([task]) | ||
2097 | while ready: | ||
2098 | new = set() | ||
2099 | for t in ready: | ||
2100 | for deptask in self.rqdata.runtaskentries[t].revdeps: | ||
2101 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: | ||
2102 | continue | ||
2103 | if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): | ||
2104 | new.add(deptask) | ||
2105 | self.tasks_scenequeue_done.add(deptask) | ||
2106 | self.tasks_notcovered.add(deptask) | ||
2107 | #logger.warning("Up: " + str(deptask)) | ||
2108 | ready = new | ||
2109 | |||
2110 | def scenequeue_updatecounters(self, task, fail=False): | ||
2074 | for dep in self.sqdata.sq_deps[task]: | 2111 | for dep in self.sqdata.sq_deps[task]: |
2075 | if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: | 2112 | if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: |
2076 | logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) | 2113 | logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) |
@@ -2083,6 +2120,43 @@ class RunQueueExecute: | |||
2083 | if len(self.sqdata.sq_revdeps2[dep]) == 0: | 2120 | if len(self.sqdata.sq_revdeps2[dep]) == 0: |
2084 | self.sq_buildable.add(dep) | 2121 | self.sq_buildable.add(dep) |
2085 | 2122 | ||
2123 | next = set([task]) | ||
2124 | while next: | ||
2125 | new = set() | ||
2126 | for t in next: | ||
2127 | self.tasks_scenequeue_done.add(t) | ||
2128 | # Look down the dependency chain for non-setscene things which this task depends on | ||
2129 | # and mark as 'done' | ||
2130 | for dep in self.rqdata.runtaskentries[t].depends: | ||
2131 | if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: | ||
2132 | continue | ||
2133 | if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): | ||
2134 | new.add(dep) | ||
2135 | #logger.warning(" Down: " + dep) | ||
2136 | next = new | ||
2137 | |||
2138 | if task in self.sqdata.unskippable: | ||
2139 | self.scenequeue_process_unskippable(task) | ||
2140 | |||
2141 | if task in self.scenequeue_notcovered: | ||
2142 | self.scenequeue_process_notcovered(task) | ||
2143 | elif task in self.scenequeue_covered: | ||
2144 | logger.debug(1, 'Queued setscene task %s', task) | ||
2145 | self.coveredtopocess.add(task) | ||
2146 | |||
2147 | for task in self.coveredtopocess.copy(): | ||
2148 | if self.sqdata.sq_covered_tasks[task].issubset(self.tasks_scenequeue_done): | ||
2149 | logger.debug(1, 'Processing setscene task %s', task) | ||
2150 | covered = self.sqdata.sq_covered_tasks[task] | ||
2151 | covered.add(task) | ||
2152 | # Remove notcovered tasks | ||
2153 | covered.difference_update(self.tasks_notcovered) | ||
2154 | self.tasks_covered.update(covered) | ||
2155 | self.coveredtopocess.remove(task) | ||
2156 | for tid in covered: | ||
2157 | if len(self.rqdata.runtaskentries[tid].depends) == 0: | ||
2158 | self.setbuildable(tid) | ||
2159 | |||
2086 | def sq_task_completeoutright(self, task): | 2160 | def sq_task_completeoutright(self, task): |
2087 | """ | 2161 | """ |
2088 | Mark a task as completed | 2162 | Mark a task as completed |
@@ -2113,6 +2187,7 @@ class RunQueueExecute: | |||
2113 | self.sq_stats.taskFailed() | 2187 | self.sq_stats.taskFailed() |
2114 | bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) | 2188 | bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) |
2115 | self.scenequeue_notcovered.add(task) | 2189 | self.scenequeue_notcovered.add(task) |
2190 | self.tasks_notcovered.add(task) | ||
2116 | self.scenequeue_updatecounters(task, True) | 2191 | self.scenequeue_updatecounters(task, True) |
2117 | self.sq_check_taskfail(task) | 2192 | self.sq_check_taskfail(task) |
2118 | 2193 | ||
@@ -2122,6 +2197,7 @@ class RunQueueExecute: | |||
2122 | self.sq_stats.taskSkipped() | 2197 | self.sq_stats.taskSkipped() |
2123 | self.sq_stats.taskCompleted() | 2198 | self.sq_stats.taskCompleted() |
2124 | self.scenequeue_notcovered.add(task) | 2199 | self.scenequeue_notcovered.add(task) |
2200 | self.tasks_notcovered.add(task) | ||
2125 | self.scenequeue_updatecounters(task, True) | 2201 | self.scenequeue_updatecounters(task, True) |
2126 | 2202 | ||
2127 | def sq_task_skip(self, task): | 2203 | def sq_task_skip(self, task): |
@@ -2136,6 +2212,9 @@ class RunQueueExecute: | |||
2136 | Run the tasks in a queue prepared by prepare_runqueue | 2212 | Run the tasks in a queue prepared by prepare_runqueue |
2137 | """ | 2213 | """ |
2138 | 2214 | ||
2215 | if self.sqdone: | ||
2216 | return True | ||
2217 | |||
2139 | self.rq.read_workers() | 2218 | self.rq.read_workers() |
2140 | 2219 | ||
2141 | task = None | 2220 | task = None |
@@ -2209,7 +2288,7 @@ class RunQueueExecute: | |||
2209 | if self.can_start_task(): | 2288 | if self.can_start_task(): |
2210 | return True | 2289 | return True |
2211 | 2290 | ||
2212 | if self.sq_stats.active > 0: | 2291 | if self.stats.active > 0 or self.sq_stats.active > 0: |
2213 | self.rq.read_workers() | 2292 | self.rq.read_workers() |
2214 | return self.rq.active_fds() | 2293 | return self.rq.active_fds() |
2215 | 2294 | ||
@@ -2221,11 +2300,14 @@ class RunQueueExecute: | |||
2221 | 2300 | ||
2222 | logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) | 2301 | logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) |
2223 | 2302 | ||
2224 | self.rq.state = runQueueRunInit | ||
2225 | |||
2226 | completeevent = sceneQueueComplete(self.sq_stats, self.rq) | 2303 | completeevent = sceneQueueComplete(self.sq_stats, self.rq) |
2227 | bb.event.fire(completeevent, self.cfgData) | 2304 | bb.event.fire(completeevent, self.cfgData) |
2228 | 2305 | ||
2306 | if self.cooker.configuration.setsceneonly: | ||
2307 | self.rq.state = runQueueComplete | ||
2308 | |||
2309 | self.sqdone = True | ||
2310 | |||
2229 | return True | 2311 | return True |
2230 | 2312 | ||
2231 | def sq_build_taskdepdata(self, task): | 2313 | def sq_build_taskdepdata(self, task): |
@@ -2366,6 +2448,12 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2366 | if tid in rqdata.runq_setscene_tids: | 2448 | if tid in rqdata.runq_setscene_tids: |
2367 | continue | 2449 | continue |
2368 | sqdata.unskippable.remove(tid) | 2450 | sqdata.unskippable.remove(tid) |
2451 | if len(rqdata.runtaskentries[tid].depends) == 0: | ||
2452 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable | ||
2453 | sqrq.tasks_notcovered.add(tid) | ||
2454 | sqrq.tasks_scenequeue_done.add(tid) | ||
2455 | sqrq.setbuildable(tid) | ||
2456 | sqrq.scenequeue_process_unskippable(tid) | ||
2369 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends | 2457 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends |
2370 | new = True | 2458 | new = True |
2371 | 2459 | ||
@@ -2499,33 +2587,6 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2499 | logger.debug(2, 'No package found, so skipping setscene task %s', tid) | 2587 | logger.debug(2, 'No package found, so skipping setscene task %s', tid) |
2500 | sqdata.outrightfail.append(tid) | 2588 | sqdata.outrightfail.append(tid) |
2501 | 2589 | ||
2502 | def start_runqueue_tasks(rqexec): | ||
2503 | # Mark initial buildable tasks | ||
2504 | for tid in rqexec.rqdata.runtaskentries: | ||
2505 | if len(rqexec.rqdata.runtaskentries[tid].depends) == 0: | ||
2506 | rqexec.setbuildable(tid) | ||
2507 | if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered): | ||
2508 | rqexec.tasks_covered.add(tid) | ||
2509 | |||
2510 | found = True | ||
2511 | while found: | ||
2512 | found = False | ||
2513 | for tid in rqexec.rqdata.runtaskentries: | ||
2514 | if tid in rqexec.tasks_covered: | ||
2515 | continue | ||
2516 | logger.debug(1, 'Considering %s: %s' % (tid, str(rqexec.rqdata.runtaskentries[tid].revdeps))) | ||
2517 | |||
2518 | if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered): | ||
2519 | if tid in rqexec.scenequeue_notcovered: | ||
2520 | continue | ||
2521 | found = True | ||
2522 | rqexec.tasks_covered.add(tid) | ||
2523 | |||
2524 | logger.debug(1, 'Skip list %s', sorted(rqexec.tasks_covered)) | ||
2525 | |||
2526 | for task in self.rq.scenequeue_notcovered: | ||
2527 | logger.debug(1, 'Not skipping task %s', task) | ||
2528 | |||
2529 | class TaskFailure(Exception): | 2590 | class TaskFailure(Exception): |
2530 | """ | 2591 | """ |
2531 | Exception raised when a task in a runqueue fails | 2592 | Exception raised when a task in a runqueue fails |