diff options
Diffstat (limited to 'meta/lib/oeqa/utils/decorators.py')
-rw-r--r-- | meta/lib/oeqa/utils/decorators.py | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/meta/lib/oeqa/utils/decorators.py b/meta/lib/oeqa/utils/decorators.py new file mode 100644 index 0000000000..40bd4ef2db --- /dev/null +++ b/meta/lib/oeqa/utils/decorators.py | |||
@@ -0,0 +1,158 @@ | |||
1 | # Copyright (C) 2013 Intel Corporation | ||
2 | # | ||
3 | # Released under the MIT license (see COPYING.MIT) | ||
4 | |||
5 | # Some custom decorators that can be used by unittests | ||
6 | # Most useful is skipUnlessPassed which can be used for | ||
7 | # creating dependecies between two test methods. | ||
8 | |||
9 | import os | ||
10 | import logging | ||
11 | import sys | ||
12 | import unittest | ||
13 | |||
14 | #get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame | ||
15 | class getResults(object): | ||
16 | def __init__(self): | ||
17 | #dynamically determine the unittest.case frame and use it to get the name of the test method | ||
18 | upperf = sys._current_frames().values()[0] | ||
19 | while (upperf.f_globals['__name__'] != 'unittest.case'): | ||
20 | upperf = upperf.f_back | ||
21 | |||
22 | def handleList(items): | ||
23 | ret = [] | ||
24 | # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception()) | ||
25 | for i in items: | ||
26 | s = i[0].id() | ||
27 | #Handle the _ErrorHolder objects from skipModule failures | ||
28 | if "setUpModule (" in s: | ||
29 | ret.append(s.replace("setUpModule (", "").replace(")","")) | ||
30 | else: | ||
31 | ret.append(s) | ||
32 | return ret | ||
33 | self.faillist = handleList(upperf.f_locals['result'].failures) | ||
34 | self.errorlist = handleList(upperf.f_locals['result'].errors) | ||
35 | self.skiplist = handleList(upperf.f_locals['result'].skipped) | ||
36 | |||
37 | def getFailList(self): | ||
38 | return self.faillist | ||
39 | |||
40 | def getErrorList(self): | ||
41 | return self.errorlist | ||
42 | |||
43 | def getSkipList(self): | ||
44 | return self.skiplist | ||
45 | |||
46 | class skipIfFailure(object): | ||
47 | |||
48 | def __init__(self,testcase): | ||
49 | self.testcase = testcase | ||
50 | |||
51 | def __call__(self,f): | ||
52 | def wrapped_f(*args): | ||
53 | res = getResults() | ||
54 | if self.testcase in (res.getFailList() or res.getErrorList()): | ||
55 | raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) | ||
56 | return f(*args) | ||
57 | wrapped_f.__name__ = f.__name__ | ||
58 | return wrapped_f | ||
59 | |||
60 | class skipIfSkipped(object): | ||
61 | |||
62 | def __init__(self,testcase): | ||
63 | self.testcase = testcase | ||
64 | |||
65 | def __call__(self,f): | ||
66 | def wrapped_f(*args): | ||
67 | res = getResults() | ||
68 | if self.testcase in res.getSkipList(): | ||
69 | raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) | ||
70 | return f(*args) | ||
71 | wrapped_f.__name__ = f.__name__ | ||
72 | return wrapped_f | ||
73 | |||
74 | class skipUnlessPassed(object): | ||
75 | |||
76 | def __init__(self,testcase): | ||
77 | self.testcase = testcase | ||
78 | |||
79 | def __call__(self,f): | ||
80 | def wrapped_f(*args): | ||
81 | res = getResults() | ||
82 | if self.testcase in res.getSkipList() or \ | ||
83 | self.testcase in res.getFailList() or \ | ||
84 | self.testcase in res.getErrorList(): | ||
85 | raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) | ||
86 | return f(*args) | ||
87 | wrapped_f.__name__ = f.__name__ | ||
88 | return wrapped_f | ||
89 | |||
90 | class testcase(object): | ||
91 | |||
92 | def __init__(self, test_case): | ||
93 | self.test_case = test_case | ||
94 | |||
95 | def __call__(self, func): | ||
96 | def wrapped_f(*args): | ||
97 | return func(*args) | ||
98 | wrapped_f.test_case = self.test_case | ||
99 | return wrapped_f | ||
100 | |||
101 | class NoParsingFilter(logging.Filter): | ||
102 | def filter(self, record): | ||
103 | return record.levelno == 100 | ||
104 | |||
105 | def LogResults(original_class): | ||
106 | orig_method = original_class.run | ||
107 | |||
108 | #rewrite the run method of unittest.TestCase to add testcase logging | ||
109 | def run(self, result, *args, **kws): | ||
110 | orig_method(self, result, *args, **kws) | ||
111 | passed = True | ||
112 | testMethod = getattr(self, self._testMethodName) | ||
113 | |||
114 | #if test case is decorated then use it's number, else use it's name | ||
115 | try: | ||
116 | test_case = testMethod.test_case | ||
117 | except AttributeError: | ||
118 | test_case = self._testMethodName | ||
119 | |||
120 | #create custom logging level for filtering. | ||
121 | custom_log_level = 100 | ||
122 | logging.addLevelName(custom_log_level, 'RESULTS') | ||
123 | caller = os.path.basename(sys.argv[0]) | ||
124 | |||
125 | def results(self, message, *args, **kws): | ||
126 | if self.isEnabledFor(custom_log_level): | ||
127 | self.log(custom_log_level, message, *args, **kws) | ||
128 | logging.Logger.results = results | ||
129 | |||
130 | logging.basicConfig(filename=os.path.join(os.getcwd(),'results-'+caller+'.log'), | ||
131 | filemode='w', | ||
132 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | ||
133 | datefmt='%H:%M:%S', | ||
134 | level=custom_log_level) | ||
135 | for handler in logging.root.handlers: | ||
136 | handler.addFilter(NoParsingFilter()) | ||
137 | local_log = logging.getLogger(caller) | ||
138 | |||
139 | #check status of tests and record it | ||
140 | for (name, msg) in result.errors: | ||
141 | if self._testMethodName == str(name).split(' ')[0]: | ||
142 | local_log.results("Testcase "+str(test_case)+": ERROR") | ||
143 | local_log.results("Testcase "+str(test_case)+":\n"+msg) | ||
144 | passed = False | ||
145 | for (name, msg) in result.failures: | ||
146 | if self._testMethodName == str(name).split(' ')[0]: | ||
147 | local_log.results("Testcase "+str(test_case)+": FAILED") | ||
148 | local_log.results("Testcase "+str(test_case)+":\n"+msg) | ||
149 | passed = False | ||
150 | for (name, msg) in result.skipped: | ||
151 | if self._testMethodName == str(name).split(' ')[0]: | ||
152 | local_log.results("Testcase "+str(test_case)+": SKIPPED") | ||
153 | passed = False | ||
154 | if passed: | ||
155 | local_log.results("Testcase "+str(test_case)+": PASSED") | ||
156 | |||
157 | original_class.run = run | ||
158 | return original_class | ||