import sys import os import unittest sys.path.insert(0, os.getcwd()) import pybootchartgui.parsing as parsing import pybootchartgui.process_tree as process_tree import pybootchartgui.main as main if sys.version_info >= (3, 0): long = int class TestProcessTree(unittest.TestCase): def setUp(self): self.name = "Process tree unittest" self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') parser = main._mk_options_parser() options, args = parser.parse_args(['--q', self.rootdir]) writer = main._mk_writer(options) trace = parsing.Trace(writer, args, options) parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) trace.compile(writer) self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \ trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True) def mk_fname(self,f): return os.path.join(self.rootdir, f) def flatten(self, process_tree): flattened = [] for p in process_tree: flattened.append(p) flattened.extend(self.flatten(p.child_list)) return flattened def checkAgainstJavaExtract(self, filename, process_tree): test_data = open(filename) for expected, actual in zip(test_data, self.flatten(process_tree)): tokens = expected.split('\t') self.assertEqual(int(tokens[0]), actual.pid // 1000) self.assertEqual(tokens[1], actual.cmd) self.assertEqual(long(tokens[2]), 10 * actual.start_time) self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration") self.assertEqual(int(tokens[4]), len(actual.child_list)) self.assertEqual(int(tokens[5]), len(actual.samples)) test_data.close() def testBuild(self): process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree) def testMergeLogger(self): self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree) def testPrune(self): self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) self.processtree.prune(self.processtree.process_tree, None) process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree) def testMergeExploders(self): self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) self.processtree.prune(self.processtree.process_tree, None) self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree) def testMergeSiblings(self): self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) self.processtree.prune(self.processtree.process_tree, None) self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) self.processtree.merge_siblings(self.processtree.process_tree) process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree) def testMergeRuns(self): self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False) self.processtree.prune(self.processtree.process_tree, None) self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup'])) self.processtree.merge_siblings(self.processtree.process_tree) self.processtree.merge_runs(self.processtree.process_tree) process_tree = self.processtree.process_tree self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree) if __name__ == '__main__': unittest.main()