summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-07-04 00:14:02 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-07-15 10:28:12 +0100
commitcf829a5f660392bc83a9175ab4900b170496f349 (patch)
tree91f6183780a5508a5a1e8074e42c7e44c5276b23 /bitbake/lib/bb/runqueue.py
parent491c6049e06bf1cb1bb4413ba87ce171a3fcd6b4 (diff)
downloadpoky-cf829a5f660392bc83a9175ab4900b170496f349.tar.gz
bitbake: runqueue: Merge the queues and execute setscene and normal tasks in parallel
This is the serious functionality change in this runqueue patch series of changes. Rather than two phases of execution, the scenequeue setscene phase, followed by normal task exeuction, this change allows them to execute in parallel together. To do this we need to handle marking of tasks as covered/uncovered in a piecemeal fashion on a task by task basis rather than in a single function. The code will block normal task exeuction until any setcene task which could cover that task is executed and its status is known. There is a slight optimisation which could be possible here at the risk of races but that doesn't seem worthwhile. The state engine isn't entirely cleaned up in this commit (see FIXME) and the setscenewhitelist functionality is broken by it (see following patches) however its good enough to test with normal workflows. (Bitbake rev: 58b3f0847cc2d47e76f74d59dcbbf78fe41b118b) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py169
1 files changed, 115 insertions, 54 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index c1c4fd1b81..aafb6ffa58 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -142,7 +142,7 @@ class RunQueueScheduler(object):
142 Return the id of the first task we find that is buildable 142 Return the id of the first task we find that is buildable
143 """ 143 """
144 self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] 144 self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
145 buildable = self.buildable 145 buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)]
146 if not buildable: 146 if not buildable:
147 return None 147 return None
148 148
@@ -1454,25 +1454,18 @@ class RunQueue:
1454 1454
1455 # If we don't have any setscene functions, skip execution 1455 # If we don't have any setscene functions, skip execution
1456 if len(self.rqdata.runq_setscene_tids) == 0: 1456 if len(self.rqdata.runq_setscene_tids) == 0:
1457 self.rqdata.init_progress_reporter.finish() 1457 logger.info('No setscene tasks')
1458 self.state = runQueueRunInit 1458 for tid in self.rqdata.runtaskentries:
1459 else: 1459 if len(self.rqdata.runtaskentries[tid].depends) == 0:
1460 logger.info('Executing SetScene Tasks') 1460 self.rqexe.setbuildable(tid)
1461 self.state = runQueueSceneRun 1461 self.rqexe.tasks_notcovered.add(tid)
1462 1462 self.rqexe.sqdone = True
1463 if self.state is runQueueSceneRun: 1463 logger.info('Executing Tasks')
1464 retval = self.rqexe.sq_execute()
1465
1466 if self.state is runQueueRunInit:
1467 if self.cooker.configuration.setsceneonly:
1468 self.state = runQueueComplete
1469
1470 if self.state is runQueueRunInit:
1471 logger.info("Executing RunQueue Tasks")
1472 start_runqueue_tasks(self.rqexe)
1473 self.state = runQueueRunning 1464 self.state = runQueueRunning
1474 1465
1475 if self.state is runQueueRunning: 1466 if self.state is runQueueRunning:
1467 retval = self.rqexe.sq_execute()
1468 # FIXME revtal
1476 retval = self.rqexe.execute() 1469 retval = self.rqexe.execute()
1477 1470
1478 if self.state is runQueueCleanUp: 1471 if self.state is runQueueCleanUp:
@@ -1757,6 +1750,8 @@ class RunQueueExecute:
1757 1750
1758 self.stampcache = {} 1751 self.stampcache = {}
1759 1752
1753 self.sqdone = False
1754
1760 self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) 1755 self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
1761 self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids)) 1756 self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids))
1762 1757
@@ -1772,12 +1767,12 @@ class RunQueueExecute:
1772 self.scenequeue_covered = set() 1767 self.scenequeue_covered = set()
1773 # List of tasks which are covered (including setscene ones) 1768 # List of tasks which are covered (including setscene ones)
1774 self.tasks_covered = set() 1769 self.tasks_covered = set()
1770 self.tasks_scenequeue_done = set()
1775 self.scenequeue_notcovered = set() 1771 self.scenequeue_notcovered = set()
1772 self.tasks_notcovered = set()
1776 self.scenequeue_notneeded = set() 1773 self.scenequeue_notneeded = set()
1777 1774
1778 if len(self.rqdata.runq_setscene_tids) > 0: 1775 self.coveredtopocess = set()
1779 self.sqdata = SQData()
1780 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1781 1776
1782 schedulers = self.get_schedulers() 1777 schedulers = self.get_schedulers()
1783 for scheduler in schedulers: 1778 for scheduler in schedulers:
@@ -1789,6 +1784,10 @@ class RunQueueExecute:
1789 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1784 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1790 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1785 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1791 1786
1787 if len(self.rqdata.runq_setscene_tids) > 0:
1788 self.sqdata = SQData()
1789 build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1790
1792 def runqueue_process_waitpid(self, task, status): 1791 def runqueue_process_waitpid(self, task, status):
1793 1792
1794 # self.build_stamps[pid] may not exist when use shared work directory. 1793 # self.build_stamps[pid] may not exist when use shared work directory.
@@ -1951,6 +1950,9 @@ class RunQueueExecute:
1951 if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self): 1950 if process_setscenewhitelist(self.rq, self.rqdata, self.stampcache, self.sched, self):
1952 return True 1951 return True
1953 1952
1953 if self.cooker.configuration.setsceneonly:
1954 return True
1955
1954 self.rq.read_workers() 1956 self.rq.read_workers()
1955 1957
1956 if self.stats.total == 0: 1958 if self.stats.total == 0:
@@ -2014,7 +2016,7 @@ class RunQueueExecute:
2014 if self.can_start_task(): 2016 if self.can_start_task():
2015 return True 2017 return True
2016 2018
2017 if self.stats.active > 0: 2019 if self.stats.active > 0 or self.sq_stats.active > 0:
2018 self.rq.read_workers() 2020 self.rq.read_workers()
2019 return self.rq.active_fds() 2021 return self.rq.active_fds()
2020 2022
@@ -2026,9 +2028,9 @@ class RunQueueExecute:
2026 for task in self.rqdata.runtaskentries: 2028 for task in self.rqdata.runtaskentries:
2027 if task not in self.runq_buildable: 2029 if task not in self.runq_buildable:
2028 logger.error("Task %s never buildable!", task) 2030 logger.error("Task %s never buildable!", task)
2029 if task not in self.runq_running: 2031 elif task not in self.runq_running:
2030 logger.error("Task %s never ran!", task) 2032 logger.error("Task %s never ran!", task)
2031 if task not in self.runq_complete: 2033 elif task not in self.runq_complete:
2032 logger.error("Task %s never completed!", task) 2034 logger.error("Task %s never completed!", task)
2033 self.rq.state = runQueueComplete 2035 self.rq.state = runQueueComplete
2034 2036
@@ -2070,7 +2072,42 @@ class RunQueueExecute:
2070 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2072 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2071 return taskdepdata 2073 return taskdepdata
2072 2074
2073 def scenequeue_updatecounters(self, task, fail = False): 2075 def scenequeue_process_notcovered(self, task):
2076 logger.debug(1, 'Not skipping setscene task %s', task)
2077 if len(self.rqdata.runtaskentries[task].depends) == 0:
2078 self.setbuildable(task)
2079 notcovered = set([task])
2080 while notcovered:
2081 new = set()
2082 for t in notcovered:
2083 for deptask in self.rqdata.runtaskentries[t].depends:
2084 if deptask in notcovered or deptask in new or deptask in self.rqdata.runq_setscene_tids or deptask in self.tasks_notcovered:
2085 continue
2086 logger.debug(1, 'Task %s depends on non-setscene task %s so not skipping' % (t, deptask))
2087 new.add(deptask)
2088 self.tasks_notcovered.add(deptask)
2089 if len(self.rqdata.runtaskentries[deptask].depends) == 0:
2090 self.setbuildable(deptask)
2091 notcovered = new
2092
2093 def scenequeue_process_unskippable(self, task):
2094 # Look up the dependency chain for non-setscene things which depend on this task
2095 # and mark as 'done'/notcovered
2096 ready = set([task])
2097 while ready:
2098 new = set()
2099 for t in ready:
2100 for deptask in self.rqdata.runtaskentries[t].revdeps:
2101 if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
2102 continue
2103 if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done):
2104 new.add(deptask)
2105 self.tasks_scenequeue_done.add(deptask)
2106 self.tasks_notcovered.add(deptask)
2107 #logger.warning("Up: " + str(deptask))
2108 ready = new
2109
2110 def scenequeue_updatecounters(self, task, fail=False):
2074 for dep in self.sqdata.sq_deps[task]: 2111 for dep in self.sqdata.sq_deps[task]:
2075 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: 2112 if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
2076 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2113 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
@@ -2083,6 +2120,43 @@ class RunQueueExecute:
2083 if len(self.sqdata.sq_revdeps2[dep]) == 0: 2120 if len(self.sqdata.sq_revdeps2[dep]) == 0:
2084 self.sq_buildable.add(dep) 2121 self.sq_buildable.add(dep)
2085 2122
2123 next = set([task])
2124 while next:
2125 new = set()
2126 for t in next:
2127 self.tasks_scenequeue_done.add(t)
2128 # Look down the dependency chain for non-setscene things which this task depends on
2129 # and mark as 'done'
2130 for dep in self.rqdata.runtaskentries[t].depends:
2131 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2132 continue
2133 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2134 new.add(dep)
2135 #logger.warning(" Down: " + dep)
2136 next = new
2137
2138 if task in self.sqdata.unskippable:
2139 self.scenequeue_process_unskippable(task)
2140
2141 if task in self.scenequeue_notcovered:
2142 self.scenequeue_process_notcovered(task)
2143 elif task in self.scenequeue_covered:
2144 logger.debug(1, 'Queued setscene task %s', task)
2145 self.coveredtopocess.add(task)
2146
2147 for task in self.coveredtopocess.copy():
2148 if self.sqdata.sq_covered_tasks[task].issubset(self.tasks_scenequeue_done):
2149 logger.debug(1, 'Processing setscene task %s', task)
2150 covered = self.sqdata.sq_covered_tasks[task]
2151 covered.add(task)
2152 # Remove notcovered tasks
2153 covered.difference_update(self.tasks_notcovered)
2154 self.tasks_covered.update(covered)
2155 self.coveredtopocess.remove(task)
2156 for tid in covered:
2157 if len(self.rqdata.runtaskentries[tid].depends) == 0:
2158 self.setbuildable(tid)
2159
2086 def sq_task_completeoutright(self, task): 2160 def sq_task_completeoutright(self, task):
2087 """ 2161 """
2088 Mark a task as completed 2162 Mark a task as completed
@@ -2113,6 +2187,7 @@ class RunQueueExecute:
2113 self.sq_stats.taskFailed() 2187 self.sq_stats.taskFailed()
2114 bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData) 2188 bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData)
2115 self.scenequeue_notcovered.add(task) 2189 self.scenequeue_notcovered.add(task)
2190 self.tasks_notcovered.add(task)
2116 self.scenequeue_updatecounters(task, True) 2191 self.scenequeue_updatecounters(task, True)
2117 self.sq_check_taskfail(task) 2192 self.sq_check_taskfail(task)
2118 2193
@@ -2122,6 +2197,7 @@ class RunQueueExecute:
2122 self.sq_stats.taskSkipped() 2197 self.sq_stats.taskSkipped()
2123 self.sq_stats.taskCompleted() 2198 self.sq_stats.taskCompleted()
2124 self.scenequeue_notcovered.add(task) 2199 self.scenequeue_notcovered.add(task)
2200 self.tasks_notcovered.add(task)
2125 self.scenequeue_updatecounters(task, True) 2201 self.scenequeue_updatecounters(task, True)
2126 2202
2127 def sq_task_skip(self, task): 2203 def sq_task_skip(self, task):
@@ -2136,6 +2212,9 @@ class RunQueueExecute:
2136 Run the tasks in a queue prepared by prepare_runqueue 2212 Run the tasks in a queue prepared by prepare_runqueue
2137 """ 2213 """
2138 2214
2215 if self.sqdone:
2216 return True
2217
2139 self.rq.read_workers() 2218 self.rq.read_workers()
2140 2219
2141 task = None 2220 task = None
@@ -2209,7 +2288,7 @@ class RunQueueExecute:
2209 if self.can_start_task(): 2288 if self.can_start_task():
2210 return True 2289 return True
2211 2290
2212 if self.sq_stats.active > 0: 2291 if self.stats.active > 0 or self.sq_stats.active > 0:
2213 self.rq.read_workers() 2292 self.rq.read_workers()
2214 return self.rq.active_fds() 2293 return self.rq.active_fds()
2215 2294
@@ -2221,11 +2300,14 @@ class RunQueueExecute:
2221 2300
2222 logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2301 logger.debug(1, 'We can skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2223 2302
2224 self.rq.state = runQueueRunInit
2225
2226 completeevent = sceneQueueComplete(self.sq_stats, self.rq) 2303 completeevent = sceneQueueComplete(self.sq_stats, self.rq)
2227 bb.event.fire(completeevent, self.cfgData) 2304 bb.event.fire(completeevent, self.cfgData)
2228 2305
2306 if self.cooker.configuration.setsceneonly:
2307 self.rq.state = runQueueComplete
2308
2309 self.sqdone = True
2310
2229 return True 2311 return True
2230 2312
2231 def sq_build_taskdepdata(self, task): 2313 def sq_build_taskdepdata(self, task):
@@ -2366,6 +2448,12 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2366 if tid in rqdata.runq_setscene_tids: 2448 if tid in rqdata.runq_setscene_tids:
2367 continue 2449 continue
2368 sqdata.unskippable.remove(tid) 2450 sqdata.unskippable.remove(tid)
2451 if len(rqdata.runtaskentries[tid].depends) == 0:
2452 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2453 sqrq.tasks_notcovered.add(tid)
2454 sqrq.tasks_scenequeue_done.add(tid)
2455 sqrq.setbuildable(tid)
2456 sqrq.scenequeue_process_unskippable(tid)
2369 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2457 sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2370 new = True 2458 new = True
2371 2459
@@ -2499,33 +2587,6 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2499 logger.debug(2, 'No package found, so skipping setscene task %s', tid) 2587 logger.debug(2, 'No package found, so skipping setscene task %s', tid)
2500 sqdata.outrightfail.append(tid) 2588 sqdata.outrightfail.append(tid)
2501 2589
2502def start_runqueue_tasks(rqexec):
2503 # Mark initial buildable tasks
2504 for tid in rqexec.rqdata.runtaskentries:
2505 if len(rqexec.rqdata.runtaskentries[tid].depends) == 0:
2506 rqexec.setbuildable(tid)
2507 if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
2508 rqexec.tasks_covered.add(tid)
2509
2510 found = True
2511 while found:
2512 found = False
2513 for tid in rqexec.rqdata.runtaskentries:
2514 if tid in rqexec.tasks_covered:
2515 continue
2516 logger.debug(1, 'Considering %s: %s' % (tid, str(rqexec.rqdata.runtaskentries[tid].revdeps)))
2517
2518 if len(rqexec.rqdata.runtaskentries[tid].revdeps) > 0 and rqexec.rqdata.runtaskentries[tid].revdeps.issubset(rqexec.tasks_covered):
2519 if tid in rqexec.scenequeue_notcovered:
2520 continue
2521 found = True
2522 rqexec.tasks_covered.add(tid)
2523
2524 logger.debug(1, 'Skip list %s', sorted(rqexec.tasks_covered))
2525
2526 for task in self.rq.scenequeue_notcovered:
2527 logger.debug(1, 'Not skipping task %s', task)
2528
2529class TaskFailure(Exception): 2590class TaskFailure(Exception):
2530 """ 2591 """
2531 Exception raised when a task in a runqueue fails 2592 Exception raised when a task in a runqueue fails