import sys, os, re, struct, operator, math from collections import defaultdict import unittest sys.path.insert(0, os.getcwd()) import pybootchartgui.parsing as parsing import pybootchartgui.main as main debug = False def floatEq(f1, f2): return math.fabs(f1-f2) < 0.00001 bootchart_dir = os.path.join(os.path.dirname(sys.argv[0]), '../../examples/1/') parser = main._mk_options_parser() options, args = parser.parse_args(['--q', bootchart_dir]) writer = main._mk_writer(options) class TestBCParser(unittest.TestCase): def setUp(self): self.name = "My first unittest" self.rootdir = bootchart_dir def mk_fname(self,f): return os.path.join(self.rootdir, f) def testParseHeader(self): trace = parsing.Trace(writer, args, options) state = parsing.parse_file(writer, trace, self.mk_fname('header')) self.assertEqual(6, len(state.headers)) self.assertEqual(2, parsing.get_num_cpus(state.headers)) def test_parseTimedBlocks(self): trace = parsing.Trace(writer, args, options) state = parsing.parse_file(writer, trace, self.mk_fname('proc_diskstats.log')) self.assertEqual(141, len(state.disk_stats)) def testParseProcPsLog(self): trace = parsing.Trace(writer, args, options) state = parsing.parse_file(writer, trace, self.mk_fname('proc_ps.log')) samples = state.ps_stats processes = samples.process_map sorted_processes = [processes[k] for k in sorted(processes.keys())] ps_data = open(self.mk_fname('extract2.proc_ps.log')) for index, line in enumerate(ps_data): tokens = line.split(); process = sorted_processes[index] if debug: print(tokens[0:4]) print(process.pid / 1000, process.cmd, process.ppid, len(process.samples)) print('-------------------') self.assertEqual(tokens[0], str(process.pid // 1000)) self.assertEqual(tokens[1], str(process.cmd)) self.assertEqual(tokens[2], str(process.ppid // 1000)) self.assertEqual(tokens[3], str(len(process.samples))) ps_data.close() def testparseProcDiskStatLog(self): trace = parsing.Trace(writer, args, options) state_with_headers = parsing.parse_file(writer, trace, self.mk_fname('header')) state_with_headers.headers['system.cpu'] = 'xxx (2)' samples = parsing.parse_file(writer, state_with_headers, self.mk_fname('proc_diskstats.log')).disk_stats self.assertEqual(141, len(samples)) diskstats_data = open(self.mk_fname('extract.proc_diskstats.log')) for index, line in enumerate(diskstats_data): tokens = line.split('\t') sample = samples[index] if debug: print(line.rstrip()) print(sample) print('-------------------') self.assertEqual(tokens[0], str(sample.time)) self.assert_(floatEq(float(tokens[1]), sample.read)) self.assert_(floatEq(float(tokens[2]), sample.write)) self.assert_(floatEq(float(tokens[3]), sample.util)) diskstats_data.close() def testparseProcStatLog(self): trace = parsing.Trace(writer, args, options) samples = parsing.parse_file(writer, trace, self.mk_fname('proc_stat.log')).cpu_stats self.assertEqual(141, len(samples)) stat_data = open(self.mk_fname('extract.proc_stat.log')) for index, line in enumerate(stat_data): tokens = line.split('\t') sample = samples[index] if debug: print(line.rstrip()) print(sample) print('-------------------') self.assert_(floatEq(float(tokens[0]), sample.time)) self.assert_(floatEq(float(tokens[1]), sample.user)) self.assert_(floatEq(float(tokens[2]), sample.sys)) self.assert_(floatEq(float(tokens[3]), sample.io)) stat_data.close() if __name__ == '__main__': unittest.main()