diff options
Diffstat (limited to 'scripts/pybootchartgui/pybootchartgui/tests/parser_test.py')
-rw-r--r-- | scripts/pybootchartgui/pybootchartgui/tests/parser_test.py | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py new file mode 100644 index 0000000000..00fb3bf797 --- /dev/null +++ b/scripts/pybootchartgui/pybootchartgui/tests/parser_test.py | |||
@@ -0,0 +1,105 @@ | |||
1 | import sys, os, re, struct, operator, math | ||
2 | from collections import defaultdict | ||
3 | import unittest | ||
4 | |||
5 | sys.path.insert(0, os.getcwd()) | ||
6 | |||
7 | import pybootchartgui.parsing as parsing | ||
8 | import pybootchartgui.main as main | ||
9 | |||
10 | debug = False | ||
11 | |||
12 | def floatEq(f1, f2): | ||
13 | return math.fabs(f1-f2) < 0.00001 | ||
14 | |||
15 | bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') | ||
16 | parser = main._mk_options_parser() | ||
17 | options, args = parser.parse_args(['--q', bootchart_dir]) | ||
18 | writer = main._mk_writer(options) | ||
19 | |||
20 | class TestBCParser(unittest.TestCase): | ||
21 | |||
22 | def setUp(self): | ||
23 | self.name = "My first unittest" | ||
24 | self.rootdir = bootchart_dir | ||
25 | |||
26 | def mk_fname(self,f): | ||
27 | return os.path.join(self.rootdir, f) | ||
28 | |||
29 | def testParseHeader(self): | ||
30 | trace = parsing.Trace(writer, args, options) | ||
31 | state = parsing.parse_file(writer, trace, self.mk_fname('header')) | ||
32 | self.assertEqual(6, len(state.headers)) | ||
33 | self.assertEqual(2, parsing.get_num_cpus(state.headers)) | ||
34 | |||
35 | def test_parseTimedBlocks(self): | ||
36 | trace = parsing.Trace(writer, args, options) | ||
37 | state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log')) | ||
38 | self.assertEqual(141, len(state.disk_stats)) | ||
39 | |||
40 | def testParseProcPsLog(self): | ||
41 | trace = parsing.Trace(writer, args, options) | ||
42 | state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) | ||
43 | samples = state.ps_stats | ||
44 | processes = samples.process_map | ||
45 | sorted_processes = [processes[k] for k in sorted(processes.keys())] | ||
46 | |||
47 | ps_data = open(self.mk_fname('extract2.proc_ps.log')) | ||
48 | for index, line in enumerate(ps_data): | ||
49 | tokens = line.split(); | ||
50 | process = sorted_processes[index] | ||
51 | if debug: | ||
52 | print(tokens[0:4]) | ||
53 | print(process.pid / 1000, process.cmd, process.ppid, len(process.samples)) | ||
54 | print('-------------------') | ||
55 | |||
56 | self.assertEqual(tokens[0], str(process.pid // 1000)) | ||
57 | self.assertEqual(tokens[1], str(process.cmd)) | ||
58 | self.assertEqual(tokens[2], str(process.ppid // 1000)) | ||
59 | self.assertEqual(tokens[3], str(len(process.samples))) | ||
60 | ps_data.close() | ||
61 | |||
62 | def testparseProcDiskStatLog(self): | ||
63 | trace = parsing.Trace(writer, args, options) | ||
64 | state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header')) | ||
65 | state_with_headers.headers['system.cpu'] = 'xxx (2)' | ||
66 | samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats | ||
67 | self.assertEqual(141, len(samples)) | ||
68 | |||
69 | diskstats_data = open(self.mk_fname('extract.proc_diskstats.log')) | ||
70 | for index, line in enumerate(diskstats_data): | ||
71 | tokens = line.split('\t') | ||
72 | sample = samples[index] | ||
73 | if debug: | ||
74 | print(line.rstrip()) | ||
75 | print(sample) | ||
76 | print('-------------------') | ||
77 | |||
78 | self.assertEqual(tokens[0], str(sample.time)) | ||
79 | self.assert_(floatEq(float(tokens[1]), sample.read)) | ||
80 | self.assert_(floatEq(float(tokens[2]), sample.write)) | ||
81 | self.assert_(floatEq(float(tokens[3]), sample.util)) | ||
82 | diskstats_data.close() | ||
83 | |||
84 | def testparseProcStatLog(self): | ||
85 | trace = parsing.Trace(writer, args, options) | ||
86 | samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats | ||
87 | self.assertEqual(141, len(samples)) | ||
88 | |||
89 | stat_data = open(self.mk_fname('extract.proc_stat.log')) | ||
90 | for index, line in enumerate(stat_data): | ||
91 | tokens = line.split('\t') | ||
92 | sample = samples[index] | ||
93 | if debug: | ||
94 | print(line.rstrip()) | ||
95 | print(sample) | ||
96 | print('-------------------') | ||
97 | self.assert_(floatEq(float(tokens[0]), sample.time)) | ||
98 | self.assert_(floatEq(float(tokens[1]), sample.user)) | ||
99 | self.assert_(floatEq(float(tokens[2]), sample.sys)) | ||
100 | self.assert_(floatEq(float(tokens[3]), sample.io)) | ||
101 | stat_data.close() | ||
102 | |||
103 | if __name__ == '__main__': | ||
104 | unittest.main() | ||
105 | |||