summaryrefslogtreecommitdiffstats
path: root/scripts/pybootchartgui/pybootchartgui/tests
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/pybootchartgui/pybootchartgui/tests')
-rw-r--r--scripts/pybootchartgui/pybootchartgui/tests/parser_test.py105
-rw-r--r--scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py92
2 files changed, 197 insertions, 0 deletions
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
new file mode 100644
index 0000000000..00fb3bf797
--- /dev/null
+++ b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py
@@ -0,0 +1,105 @@
1import sys, os, re, struct, operator, math
2from collections import defaultdict
3import unittest
4
5sys.path.insert(0, os.getcwd())
6
7import pybootchartgui.parsing as parsing
8import pybootchartgui.main as main
9
10debug = False
11
12def floatEq(f1, f2):
13 return math.fabs(f1-f2) < 0.00001
14
15bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
16parser = main._mk_options_parser()
17options, args = parser.parse_args(['--q', bootchart_dir])
18writer = main._mk_writer(options)
19
20class TestBCParser(unittest.TestCase):
21
22 def setUp(self):
23 self.name = "My first unittest"
24 self.rootdir = bootchart_dir
25
26 def mk_fname(self,f):
27 return os.path.join(self.rootdir, f)
28
29 def testParseHeader(self):
30 trace = parsing.Trace(writer, args, options)
31 state = parsing.parse_file(writer, trace, self.mk_fname('header'))
32 self.assertEqual(6, len(state.headers))
33 self.assertEqual(2, parsing.get_num_cpus(state.headers))
34
35 def test_parseTimedBlocks(self):
36 trace = parsing.Trace(writer, args, options)
37 state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log'))
38 self.assertEqual(141, len(state.disk_stats))
39
40 def testParseProcPsLog(self):
41 trace = parsing.Trace(writer, args, options)
42 state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
43 samples = state.ps_stats
44 processes = samples.process_map
45 sorted_processes = [processes[k] for k in sorted(processes.keys())]
46
47 ps_data = open(self.mk_fname('extract2.proc_ps.log'))
48 for index, line in enumerate(ps_data):
49 tokens = line.split();
50 process = sorted_processes[index]
51 if debug:
52 print(tokens[0:4])
53 print(process.pid / 1000, process.cmd, process.ppid, len(process.samples))
54 print('-------------------')
55
56 self.assertEqual(tokens[0], str(process.pid // 1000))
57 self.assertEqual(tokens[1], str(process.cmd))
58 self.assertEqual(tokens[2], str(process.ppid // 1000))
59 self.assertEqual(tokens[3], str(len(process.samples)))
60 ps_data.close()
61
62 def testparseProcDiskStatLog(self):
63 trace = parsing.Trace(writer, args, options)
64 state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header'))
65 state_with_headers.headers['system.cpu'] = 'xxx (2)'
66 samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats
67 self.assertEqual(141, len(samples))
68
69 diskstats_data = open(self.mk_fname('extract.proc_diskstats.log'))
70 for index, line in enumerate(diskstats_data):
71 tokens = line.split('\t')
72 sample = samples[index]
73 if debug:
74 print(line.rstrip())
75 print(sample)
76 print('-------------------')
77
78 self.assertEqual(tokens[0], str(sample.time))
79 self.assert_(floatEq(float(tokens[1]), sample.read))
80 self.assert_(floatEq(float(tokens[2]), sample.write))
81 self.assert_(floatEq(float(tokens[3]), sample.util))
82 diskstats_data.close()
83
84 def testparseProcStatLog(self):
85 trace = parsing.Trace(writer, args, options)
86 samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats
87 self.assertEqual(141, len(samples))
88
89 stat_data = open(self.mk_fname('extract.proc_stat.log'))
90 for index, line in enumerate(stat_data):
91 tokens = line.split('\t')
92 sample = samples[index]
93 if debug:
94 print(line.rstrip())
95 print(sample)
96 print('-------------------')
97 self.assert_(floatEq(float(tokens[0]), sample.time))
98 self.assert_(floatEq(float(tokens[1]), sample.user))
99 self.assert_(floatEq(float(tokens[2]), sample.sys))
100 self.assert_(floatEq(float(tokens[3]), sample.io))
101 stat_data.close()
102
103if __name__ == '__main__':
104 unittest.main()
105
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py
new file mode 100644
index 0000000000..6f46a1c03d
--- /dev/null
+++ b/scripts/pybootchartgui/pybootchartgui/tests/process_tree_test.py
@@ -0,0 +1,92 @@
1import sys
2import os
3import unittest
4
5sys.path.insert(0, os.getcwd())
6
7import pybootchartgui.parsing as parsing
8import pybootchartgui.process_tree as process_tree
9import pybootchartgui.main as main
10
11if sys.version_info >= (3, 0):
12 long = int
13
14class TestProcessTree(unittest.TestCase):
15
16 def setUp(self):
17 self.name = "Process tree unittest"
18 self.rootdir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/')
19
20 parser = main._mk_options_parser()
21 options, args = parser.parse_args(['--q', self.rootdir])
22 writer = main._mk_writer(options)
23 trace = parsing.Trace(writer, args, options)
24
25 parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log'))
26 trace.compile(writer)
27 self.processtree = process_tree.ProcessTree(writer, None, trace.ps_stats, \
28 trace.ps_stats.sample_period, None, options.prune, None, None, False, for_testing = True)
29
30 def mk_fname(self,f):
31 return os.path.join(self.rootdir, f)
32
33 def flatten(self, process_tree):
34 flattened = []
35 for p in process_tree:
36 flattened.append(p)
37 flattened.extend(self.flatten(p.child_list))
38 return flattened
39
40 def checkAgainstJavaExtract(self, filename, process_tree):
41 test_data = open(filename)
42 for expected, actual in zip(test_data, self.flatten(process_tree)):
43 tokens = expected.split('\t')
44 self.assertEqual(int(tokens[0]), actual.pid // 1000)
45 self.assertEqual(tokens[1], actual.cmd)
46 self.assertEqual(long(tokens[2]), 10 * actual.start_time)
47 self.assert_(long(tokens[3]) - 10 * actual.duration < 5, "duration")
48 self.assertEqual(int(tokens[4]), len(actual.child_list))
49 self.assertEqual(int(tokens[5]), len(actual.samples))
50 test_data.close()
51
52 def testBuild(self):
53 process_tree = self.processtree.process_tree
54 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.1.log'), process_tree)
55
56 def testMergeLogger(self):
57 self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
58 process_tree = self.processtree.process_tree
59 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.2.log'), process_tree)
60
61 def testPrune(self):
62 self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
63 self.processtree.prune(self.processtree.process_tree, None)
64 process_tree = self.processtree.process_tree
65 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3b.log'), process_tree)
66
67 def testMergeExploders(self):
68 self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
69 self.processtree.prune(self.processtree.process_tree, None)
70 self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
71 process_tree = self.processtree.process_tree
72 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3c.log'), process_tree)
73
74 def testMergeSiblings(self):
75 self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
76 self.processtree.prune(self.processtree.process_tree, None)
77 self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
78 self.processtree.merge_siblings(self.processtree.process_tree)
79 process_tree = self.processtree.process_tree
80 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3d.log'), process_tree)
81
82 def testMergeRuns(self):
83 self.processtree.merge_logger(self.processtree.process_tree, 'bootchartd', None, False)
84 self.processtree.prune(self.processtree.process_tree, None)
85 self.processtree.merge_exploders(self.processtree.process_tree, set(['hwup']))
86 self.processtree.merge_siblings(self.processtree.process_tree)
87 self.processtree.merge_runs(self.processtree.process_tree)
88 process_tree = self.processtree.process_tree
89 self.checkAgainstJavaExtract(self.mk_fname('extract.processtree.3e.log'), process_tree)
90
91if __name__ == '__main__':
92 unittest.main()