diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 185 |
1 files changed, 154 insertions, 31 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 30cab5379e..886eef1f27 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -24,6 +24,7 @@ import pickle | |||
24 | from multiprocessing import Process | 24 | from multiprocessing import Process |
25 | import shlex | 25 | import shlex |
26 | import pprint | 26 | import pprint |
27 | import time | ||
27 | 28 | ||
28 | bblogger = logging.getLogger("BitBake") | 29 | bblogger = logging.getLogger("BitBake") |
29 | logger = logging.getLogger("BitBake.RunQueue") | 30 | logger = logging.getLogger("BitBake.RunQueue") |
@@ -142,6 +143,55 @@ class RunQueueScheduler(object): | |||
142 | self.buildable.append(tid) | 143 | self.buildable.append(tid) |
143 | 144 | ||
144 | self.rev_prio_map = None | 145 | self.rev_prio_map = None |
146 | self.is_pressure_usable() | ||
147 | |||
148 | def is_pressure_usable(self): | ||
149 | """ | ||
150 | If monitoring pressure, return True if pressure files can be open and read. For example | ||
151 | openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) | ||
152 | is returned. | ||
153 | """ | ||
154 | if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: | ||
155 | try: | ||
156 | with open("/proc/pressure/cpu") as cpu_pressure_fds, \ | ||
157 | open("/proc/pressure/io") as io_pressure_fds, \ | ||
158 | open("/proc/pressure/memory") as memory_pressure_fds: | ||
159 | |||
160 | self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] | ||
161 | self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] | ||
162 | self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] | ||
163 | self.prev_pressure_time = time.time() | ||
164 | self.check_pressure = True | ||
165 | except: | ||
166 | bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") | ||
167 | self.check_pressure = False | ||
168 | else: | ||
169 | self.check_pressure = False | ||
170 | |||
171 | def exceeds_max_pressure(self): | ||
172 | """ | ||
173 | Monitor the difference in total pressure at least once per second, if | ||
174 | BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. | ||
175 | """ | ||
176 | if self.check_pressure: | ||
177 | with open("/proc/pressure/cpu") as cpu_pressure_fds, \ | ||
178 | open("/proc/pressure/io") as io_pressure_fds, \ | ||
179 | open("/proc/pressure/memory") as memory_pressure_fds: | ||
180 | # extract "total" from /proc/pressure/{cpu|io} | ||
181 | curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] | ||
182 | curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] | ||
183 | curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] | ||
184 | exceeds_cpu_pressure = self.rq.max_cpu_pressure and (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) > self.rq.max_cpu_pressure | ||
185 | exceeds_io_pressure = self.rq.max_io_pressure and (float(curr_io_pressure) - float(self.prev_io_pressure)) > self.rq.max_io_pressure | ||
186 | exceeds_memory_pressure = self.rq.max_memory_pressure and (float(curr_memory_pressure) - float(self.prev_memory_pressure)) > self.rq.max_memory_pressure | ||
187 | now = time.time() | ||
188 | if now - self.prev_pressure_time > 1.0: | ||
189 | self.prev_cpu_pressure = curr_cpu_pressure | ||
190 | self.prev_io_pressure = curr_io_pressure | ||
191 | self.prev_memory_pressure = curr_memory_pressure | ||
192 | self.prev_pressure_time = now | ||
193 | return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) | ||
194 | return False | ||
145 | 195 | ||
146 | def next_buildable_task(self): | 196 | def next_buildable_task(self): |
147 | """ | 197 | """ |
@@ -155,6 +205,12 @@ class RunQueueScheduler(object): | |||
155 | if not buildable: | 205 | if not buildable: |
156 | return None | 206 | return None |
157 | 207 | ||
208 | # Bitbake requires that at least one task be active. Only check for pressure if | ||
209 | # this is the case, otherwise the pressure limitation could result in no tasks | ||
210 | # being active and no new tasks started thereby, at times, breaking the scheduler. | ||
211 | if self.rq.stats.active and self.exceeds_max_pressure(): | ||
212 | return None | ||
213 | |||
158 | # Filter out tasks that have a max number of threads that have been exceeded | 214 | # Filter out tasks that have a max number of threads that have been exceeded |
159 | skip_buildable = {} | 215 | skip_buildable = {} |
160 | for running in self.rq.runq_running.difference(self.rq.runq_complete): | 216 | for running in self.rq.runq_running.difference(self.rq.runq_complete): |
@@ -1256,8 +1312,8 @@ class RunQueue: | |||
1256 | "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, | 1312 | "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv, |
1257 | "sigdata" : bb.parse.siggen.get_taskdata(), | 1313 | "sigdata" : bb.parse.siggen.get_taskdata(), |
1258 | "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, | 1314 | "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, |
1259 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | 1315 | "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, |
1260 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, | 1316 | "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, |
1261 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, | 1317 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, |
1262 | "prhost" : self.cooker.prhost, | 1318 | "prhost" : self.cooker.prhost, |
1263 | "buildname" : self.cfgData.getVar("BUILDNAME"), | 1319 | "buildname" : self.cfgData.getVar("BUILDNAME"), |
@@ -1700,6 +1756,9 @@ class RunQueueExecute: | |||
1700 | 1756 | ||
1701 | self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) | 1757 | self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) |
1702 | self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" | 1758 | self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" |
1759 | self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") | ||
1760 | self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") | ||
1761 | self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") | ||
1703 | 1762 | ||
1704 | self.sq_buildable = set() | 1763 | self.sq_buildable = set() |
1705 | self.sq_running = set() | 1764 | self.sq_running = set() |
@@ -1735,6 +1794,29 @@ class RunQueueExecute: | |||
1735 | if self.number_tasks <= 0: | 1794 | if self.number_tasks <= 0: |
1736 | bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) | 1795 | bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) |
1737 | 1796 | ||
1797 | lower_limit = 1.0 | ||
1798 | upper_limit = 1000000.0 | ||
1799 | if self.max_cpu_pressure: | ||
1800 | self.max_cpu_pressure = float(self.max_cpu_pressure) | ||
1801 | if self.max_cpu_pressure < lower_limit: | ||
1802 | bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) | ||
1803 | if self.max_cpu_pressure > upper_limit: | ||
1804 | bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) | ||
1805 | |||
1806 | if self.max_io_pressure: | ||
1807 | self.max_io_pressure = float(self.max_io_pressure) | ||
1808 | if self.max_io_pressure < lower_limit: | ||
1809 | bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) | ||
1810 | if self.max_io_pressure > upper_limit: | ||
1811 | bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) | ||
1812 | |||
1813 | if self.max_memory_pressure: | ||
1814 | self.max_memory_pressure = float(self.max_memory_pressure) | ||
1815 | if self.max_memory_pressure < lower_limit: | ||
1816 | bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) | ||
1817 | if self.max_memory_pressure > upper_limit: | ||
1818 | bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) | ||
1819 | |||
1738 | # List of setscene tasks which we've covered | 1820 | # List of setscene tasks which we've covered |
1739 | self.scenequeue_covered = set() | 1821 | self.scenequeue_covered = set() |
1740 | # List of tasks which are covered (including setscene ones) | 1822 | # List of tasks which are covered (including setscene ones) |
@@ -1893,6 +1975,20 @@ class RunQueueExecute: | |||
1893 | self.setbuildable(revdep) | 1975 | self.setbuildable(revdep) |
1894 | logger.debug(1, "Marking task %s as buildable", revdep) | 1976 | logger.debug(1, "Marking task %s as buildable", revdep) |
1895 | 1977 | ||
1978 | found = None | ||
1979 | for t in sorted(self.sq_deferred.copy()): | ||
1980 | if self.sq_deferred[t] == task: | ||
1981 | # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. | ||
1982 | # We shouldn't allow all to run at once as it is prone to races. | ||
1983 | if not found: | ||
1984 | bb.note("Deferred task %s now buildable" % t) | ||
1985 | del self.sq_deferred[t] | ||
1986 | update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) | ||
1987 | found = t | ||
1988 | else: | ||
1989 | bb.note("Deferring %s after %s" % (t, found)) | ||
1990 | self.sq_deferred[t] = found | ||
1991 | |||
1896 | def task_complete(self, task): | 1992 | def task_complete(self, task): |
1897 | self.stats.taskCompleted() | 1993 | self.stats.taskCompleted() |
1898 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) | 1994 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) |
@@ -1934,6 +2030,10 @@ class RunQueueExecute: | |||
1934 | logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) | 2030 | logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) |
1935 | err = True | 2031 | err = True |
1936 | 2032 | ||
2033 | for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): | ||
2034 | # No task should end up in both covered and uncovered, that is a bug. | ||
2035 | logger.error("Setscene task %s in both covered and notcovered." % tid) | ||
2036 | |||
1937 | for tid in self.rqdata.runq_setscene_tids: | 2037 | for tid in self.rqdata.runq_setscene_tids: |
1938 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | 2038 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: |
1939 | err = True | 2039 | err = True |
@@ -1998,8 +2098,6 @@ class RunQueueExecute: | |||
1998 | logger.debug(1, "%s didn't become valid, skipping setscene" % nexttask) | 2098 | logger.debug(1, "%s didn't become valid, skipping setscene" % nexttask) |
1999 | self.sq_task_failoutright(nexttask) | 2099 | self.sq_task_failoutright(nexttask) |
2000 | return True | 2100 | return True |
2001 | else: | ||
2002 | self.sqdata.outrightfail.remove(nexttask) | ||
2003 | if nexttask in self.sqdata.outrightfail: | 2101 | if nexttask in self.sqdata.outrightfail: |
2004 | logger.debug(2, 'No package found, so skipping setscene task %s', nexttask) | 2102 | logger.debug(2, 'No package found, so skipping setscene task %s', nexttask) |
2005 | self.sq_task_failoutright(nexttask) | 2103 | self.sq_task_failoutright(nexttask) |
@@ -2150,7 +2248,8 @@ class RunQueueExecute: | |||
2150 | if self.sq_deferred: | 2248 | if self.sq_deferred: |
2151 | tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0]) | 2249 | tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0]) |
2152 | logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid) | 2250 | logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid) |
2153 | self.sq_task_failoutright(tid) | 2251 | if tid not in self.runq_complete: |
2252 | self.sq_task_failoutright(tid) | ||
2154 | return True | 2253 | return True |
2155 | 2254 | ||
2156 | if len(self.failed_tids) != 0: | 2255 | if len(self.failed_tids) != 0: |
@@ -2264,10 +2363,16 @@ class RunQueueExecute: | |||
2264 | self.updated_taskhash_queue.remove((tid, unihash)) | 2363 | self.updated_taskhash_queue.remove((tid, unihash)) |
2265 | 2364 | ||
2266 | if unihash != self.rqdata.runtaskentries[tid].unihash: | 2365 | if unihash != self.rqdata.runtaskentries[tid].unihash: |
2267 | hashequiv_logger.verbose("Task %s unihash changed to %s" % (tid, unihash)) | 2366 | # Make sure we rehash any other tasks with the same task hash that we're deferred against. |
2268 | self.rqdata.runtaskentries[tid].unihash = unihash | 2367 | torehash = [tid] |
2269 | bb.parse.siggen.set_unihash(tid, unihash) | 2368 | for deftid in self.sq_deferred: |
2270 | toprocess.add(tid) | 2369 | if self.sq_deferred[deftid] == tid: |
2370 | torehash.append(deftid) | ||
2371 | for hashtid in torehash: | ||
2372 | hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) | ||
2373 | self.rqdata.runtaskentries[hashtid].unihash = unihash | ||
2374 | bb.parse.siggen.set_unihash(hashtid, unihash) | ||
2375 | toprocess.add(hashtid) | ||
2271 | 2376 | ||
2272 | # Work out all tasks which depend upon these | 2377 | # Work out all tasks which depend upon these |
2273 | total = set() | 2378 | total = set() |
@@ -2406,6 +2511,14 @@ class RunQueueExecute: | |||
2406 | 2511 | ||
2407 | if update_tasks: | 2512 | if update_tasks: |
2408 | self.sqdone = False | 2513 | self.sqdone = False |
2514 | for mc in sorted(self.sqdata.multiconfigs): | ||
2515 | for tid in sorted([t[0] for t in update_tasks]): | ||
2516 | if mc_from_tid(tid) != mc: | ||
2517 | continue | ||
2518 | h = pending_hash_index(tid, self.rqdata) | ||
2519 | if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: | ||
2520 | self.sq_deferred[tid] = self.sqdata.hashes[h] | ||
2521 | bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) | ||
2409 | update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) | 2522 | update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) |
2410 | 2523 | ||
2411 | for (tid, harddepfail, origvalid) in update_tasks: | 2524 | for (tid, harddepfail, origvalid) in update_tasks: |
@@ -2421,6 +2534,9 @@ class RunQueueExecute: | |||
2421 | 2534 | ||
2422 | for dep in sorted(self.sqdata.sq_deps[task]): | 2535 | for dep in sorted(self.sqdata.sq_deps[task]): |
2423 | if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: | 2536 | if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]: |
2537 | if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: | ||
2538 | # dependency could be already processed, e.g. noexec setscene task | ||
2539 | continue | ||
2424 | logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) | 2540 | logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) |
2425 | self.sq_task_failoutright(dep) | 2541 | self.sq_task_failoutright(dep) |
2426 | continue | 2542 | continue |
@@ -2743,6 +2859,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2743 | sqdata.stamppresent = set() | 2859 | sqdata.stamppresent = set() |
2744 | sqdata.valid = set() | 2860 | sqdata.valid = set() |
2745 | 2861 | ||
2862 | sqdata.hashes = {} | ||
2863 | sqrq.sq_deferred = {} | ||
2864 | for mc in sorted(sqdata.multiconfigs): | ||
2865 | for tid in sorted(sqdata.sq_revdeps): | ||
2866 | if mc_from_tid(tid) != mc: | ||
2867 | continue | ||
2868 | h = pending_hash_index(tid, rqdata) | ||
2869 | if h not in sqdata.hashes: | ||
2870 | sqdata.hashes[h] = tid | ||
2871 | else: | ||
2872 | sqrq.sq_deferred[tid] = sqdata.hashes[h] | ||
2873 | bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) | ||
2874 | |||
2746 | update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) | 2875 | update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) |
2747 | 2876 | ||
2748 | def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): | 2877 | def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): |
@@ -2754,6 +2883,8 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s | |||
2754 | sqdata.stamppresent.remove(tid) | 2883 | sqdata.stamppresent.remove(tid) |
2755 | if tid in sqdata.valid: | 2884 | if tid in sqdata.valid: |
2756 | sqdata.valid.remove(tid) | 2885 | sqdata.valid.remove(tid) |
2886 | if tid in sqdata.outrightfail: | ||
2887 | sqdata.outrightfail.remove(tid) | ||
2757 | 2888 | ||
2758 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | 2889 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) |
2759 | 2890 | ||
@@ -2781,28 +2912,20 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s | |||
2781 | 2912 | ||
2782 | sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) | 2913 | sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) |
2783 | 2914 | ||
2784 | sqdata.hashes = {} | 2915 | for tid in tids: |
2785 | for mc in sorted(sqdata.multiconfigs): | 2916 | if tid in sqdata.stamppresent: |
2786 | for tid in sorted(sqdata.sq_revdeps): | 2917 | continue |
2787 | if mc_from_tid(tid) != mc: | 2918 | if tid in sqdata.valid: |
2788 | continue | 2919 | continue |
2789 | if tid in sqdata.stamppresent: | 2920 | if tid in sqdata.noexec: |
2790 | continue | 2921 | continue |
2791 | if tid in sqdata.valid: | 2922 | if tid in sqrq.scenequeue_covered: |
2792 | continue | 2923 | continue |
2793 | if tid in sqdata.noexec: | 2924 | if tid in sqrq.scenequeue_notcovered: |
2794 | continue | 2925 | continue |
2795 | if tid in sqrq.scenequeue_notcovered: | 2926 | if tid in sqrq.sq_deferred: |
2796 | continue | 2927 | continue |
2797 | sqdata.outrightfail.add(tid) | 2928 | sqdata.outrightfail.add(tid) |
2798 | |||
2799 | h = pending_hash_index(tid, rqdata) | ||
2800 | if h not in sqdata.hashes: | ||
2801 | sqdata.hashes[h] = tid | ||
2802 | else: | ||
2803 | sqrq.sq_deferred[tid] = sqdata.hashes[h] | ||
2804 | bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h])) | ||
2805 | |||
2806 | 2929 | ||
2807 | class TaskFailure(Exception): | 2930 | class TaskFailure(Exception): |
2808 | """ | 2931 | """ |