# ex:ts=4:sw=4:sts=4:et # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- # # utils: common methods used by the patchtest framework # # Copyright (C) 2016 Intel Corporation # # SPDX-License-Identifier: GPL-2.0-only # import os import subprocess import logging import re import mailbox class CmdException(Exception): """ Simple exception class where its attributes are the ones passed when instantiated """ def __init__(self, cmd): self._cmd = cmd def __getattr__(self, name): value = None if self._cmd.has_key(name): value = self._cmd[name] return value def exec_cmd(cmd, cwd, ignore_error=False, input=None, strip=True, updateenv={}): """ Input: cmd: dict containing the following keys: cmd : the command itself as an array of strings ignore_error: if False, no exception is raised strip: indicates if strip is done on the output (stdout and stderr) input: input data to the command (stdin) updateenv: environment variables to be appended to the current process environment variables NOTE: keys 'ignore_error' and 'input' are optional; if not included, the defaults are the ones specify in the arguments cwd: directory where commands are executed ignore_error: raise CmdException if command fails to execute and this value is False input: input data (stdin) for the command Output: dict containing the following keys: cmd: the same as input ignore_error: the same as input strip: the same as input input: the same as input stdout: Standard output after command's execution stderr: Standard error after command's execution returncode: Return code after command's execution """ cmddefaults = { 'cmd':'', 'ignore_error':ignore_error, 'strip':strip, 'input':input, 'updateenv':updateenv, } # update input values if necessary cmddefaults.update(cmd) _cmd = cmddefaults if not _cmd['cmd']: raise CmdException({'cmd':None, 'stderr':'no command given'}) # update the environment env = os.environ env.update(_cmd['updateenv']) _command = [e for e in _cmd['cmd']] p = subprocess.Popen(_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, cwd=cwd, env=env) # execute the command and strip output (_stdout, _stderr) = p.communicate(_cmd['input']) if _cmd['strip']: _stdout, _stderr = map(str.strip, [_stdout, _stderr]) # generate the result result = _cmd result.update({'cmd':_command,'stdout':_stdout,'stderr':_stderr,'returncode':p.returncode}) # launch exception if necessary if not _cmd['ignore_error'] and p.returncode: raise CmdException(result) return result def exec_cmds(cmds, cwd): """ Executes commands Input: cmds: Array of commands cwd: directory where commands are executed Output: Array of output commands """ results = [] _cmds = cmds for cmd in _cmds: result = exec_cmd(cmd, cwd) results.append(result) return results def logger_create(name): logger = logging.getLogger(name) loggerhandler = logging.StreamHandler() loggerhandler.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(loggerhandler) logger.setLevel(logging.INFO) return logger def get_subject_prefix(path): prefix = "" mbox = mailbox.mbox(path) if len(mbox): subject = mbox[0]['subject'] if subject: pattern = re.compile(r"(\[.*\])", re.DOTALL) match = pattern.search(subject) if match: prefix = match.group(1) return prefix def valid_branch(branch): """ Check if branch is valid name """ lbranch = branch.lower() invalid = lbranch.startswith('patch') or \ lbranch.startswith('rfc') or \ lbranch.startswith('resend') or \ re.search(r'^v\d+', lbranch) or \ re.search(r'^\d+/\d+', lbranch) return not invalid def get_branch(path): """ Get the branch name from mbox """ fullprefix = get_subject_prefix(path) branch, branches, valid_branches = None, [], [] if fullprefix: prefix = fullprefix.strip('[]') branches = [ b.strip() for b in prefix.split(',')] valid_branches = [b for b in branches if valid_branch(b)] if len(valid_branches): branch = valid_branches[0] return branch