# # SPDX-License-Identifier: MIT # import base64 import logging import os from threading import Lock from . import OETarget class OESerialTarget(OETarget): def __init__(self, logger, target_ip, server_ip, server_port=0, timeout=300, serialcontrol_cmd=None, serialcontrol_extra_args=None, serialcontrol_ps1=None, serialcontrol_connect_timeout=None, machine=None, **kwargs): if not logger: logger = logging.getLogger('target') logger.setLevel(logging.INFO) filePath = os.path.join(os.getcwd(), 'remoteTarget.log') fileHandler = logging.FileHandler(filePath, 'w', 'utf-8') formatter = logging.Formatter( '%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', '%H:%M:%S') fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) super(OESerialTarget, self).__init__(logger) if serialcontrol_ps1: self.target_ps1 = serialcontrol_ps1 elif machine: # fallback to a default value which assumes root@machine self.target_ps1 = f'root@{machine}:.*# ' else: raise ValueError("Unable to determine shell command prompt (PS1) format.") if not serialcontrol_cmd: raise ValueError("Unable to determine serial control command.") if serialcontrol_extra_args: self.connection_script = f'{serialcontrol_cmd} {serialcontrol_extra_args}' else: self.connection_script = serialcontrol_cmd if serialcontrol_connect_timeout: self.connect_timeout = serialcontrol_connect_timeout else: self.connect_timeout = 10 # default to 10s connection timeout self.default_command_timeout = timeout self.ip = target_ip self.server_ip = server_ip self.server_port = server_port self.conn = None self.mutex = Lock() def start(self, **kwargs): pass def stop(self, **kwargs): pass def get_connection(self): if self.conn is None: self.conn = SerialConnection(self.connection_script, self.target_ps1, self.connect_timeout, self.default_command_timeout) return self.conn def run(self, cmd, timeout=None): """ Runs command on target over the provided serial connection. The first call will open the connection, and subsequent calls will re-use the same connection to send new commands. command: Command to run on target. timeout: : Kill command after seconds. None: Kill command default value seconds. 0: No timeout, runs until return. """ # Lock needed to avoid multiple threads running commands concurrently # A serial connection can only be used by one caller at a time with self.mutex: conn = self.get_connection() self.logger.debug(f"[Running]$ {cmd}") # Run the command, then echo $? to get the command's return code try: output = conn.run_command(cmd, timeout) status = conn.run_command("echo $?") self.logger.debug(f" [stdout]: {output}") self.logger.debug(f" [ret code]: {status}\n\n") except SerialTimeoutException as e: self.logger.debug(e) output = "" status = 255 # Return to $HOME after each command to simulate a stateless SSH connection conn.run_command('cd "$HOME"') return (int(status), output) def copyTo(self, localSrc, remoteDst): """ Copies files by converting them to base 32, then transferring the ASCII text to the target, and decoding it in place on the target. On a 115k baud serial connection, this method transfers at roughly 30kbps. """ with open(localSrc, 'rb') as file: data = file.read() b32 = base64.b32encode(data).decode('utf-8') # To avoid shell line limits, send a chunk at a time SPLIT_LEN = 512 lines = [b32[i:i+SPLIT_LEN] for i in range(0, len(b32), SPLIT_LEN)] with self.mutex: conn = self.get_connection() filename = os.path.basename(localSrc) TEMP = f'/tmp/{filename}.b32' # Create or empty out the temp file conn.run_command(f'echo -n "" > {TEMP}') for line in lines: conn.run_command(f'echo -n {line} >> {TEMP}') # Check to see whether the remoteDst is a directory is_directory = conn.run_command(f'[[ -d {remoteDst} ]]; echo $?') if int(is_directory) == 0: # append the localSrc filename to the end of remoteDst remoteDst = os.path.join(remoteDst, filename) conn.run_command(f'base32 -d {TEMP} > {remoteDst}') conn.run_command(f'rm {TEMP}') return 0, 'Success' def copyFrom(self, remoteSrc, localDst): """ Copies files by converting them to base 32 on the target, then transferring the ASCII text to the host. That text is then decoded here and written out to the destination. On a 115k baud serial connection, this method transfers at roughly 30kbps. """ with self.mutex: b32 = self.get_connection().run_command(f'base32 {remoteSrc}') data = base64.b32decode(b32.replace('\r\n', '')) # If the local path is a directory, get the filename from # the remoteSrc path and append it to localDst if os.path.isdir(localDst): filename = os.path.basename(remoteSrc) localDst = os.path.join(localDst, filename) with open(localDst, 'wb') as file: file.write(data) return 0, 'Success' def copyDirTo(self, localSrc, remoteDst): """ Copy recursively localSrc directory to remoteDst in target. """ for root, dirs, files in os.walk(localSrc): # Create directories in the target as needed for d in dirs: tmpDir = os.path.join(root, d).replace(localSrc, "") newDir = os.path.join(remoteDst, tmpDir.lstrip("/")) cmd = "mkdir -p %s" % newDir self.run(cmd) # Copy files into the target for f in files: tmpFile = os.path.join(root, f).replace(localSrc, "") dstFile = os.path.join(remoteDst, tmpFile.lstrip("/")) srcFile = os.path.join(root, f) self.copyTo(srcFile, dstFile) def deleteFiles(self, remotePath, files): """ Deletes files in target's remotePath. """ cmd = "rm" if not isinstance(files, list): files = [files] for f in files: cmd = "%s %s" % (cmd, os.path.join(remotePath, f)) self.run(cmd) def deleteDir(self, remotePath): """ Deletes target's remotePath directory. """ cmd = "rmdir %s" % remotePath self.run(cmd) def deleteDirStructure(self, localPath, remotePath): """ Delete recursively localPath structure directory in target's remotePath. This function is useful to delete a package that is installed in the device under test (DUT) and the host running the test has such package extracted in tmp directory. Example: pwd: /home/user/tmp tree: . └── work ├── dir1 │   └── file1 └── dir2 localpath = "/home/user/tmp" and remotepath = "/home/user" With the above variables this function will try to delete the directory in the DUT in this order: /home/user/work/dir1/file1 /home/user/work/dir1 (if dir is empty) /home/user/work/dir2 (if dir is empty) /home/user/work (if dir is empty) """ for root, dirs, files in os.walk(localPath, topdown=False): # Delete files first tmpDir = os.path.join(root).replace(localPath, "") remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) self.deleteFiles(remoteDir, files) # Remove dirs if empty for d in dirs: tmpDir = os.path.join(root, d).replace(localPath, "") remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) self.deleteDir(remoteDir) class SerialTimeoutException(Exception): def __init__(self, msg): self.msg = msg def __str__(self): return self.msg class SerialConnection: def __init__(self, script, target_prompt, connect_timeout, default_command_timeout): import pexpect # limiting scope to avoid build dependency self.prompt = target_prompt self.connect_timeout = connect_timeout self.default_command_timeout = default_command_timeout self.conn = pexpect.spawn('/bin/bash', ['-c', script], encoding='utf8') self._seek_to_clean_shell() # Disable echo to avoid the need to parse the outgoing command self.run_command('stty -echo') def _seek_to_clean_shell(self): """ Attempts to find a clean shell, meaning it is clear and ready to accept a new command. This is necessary to ensure the correct output is captured from each command. """ import pexpect # limiting scope to avoid build dependency # Look for a clean shell # Wait a short amount of time for the connection to finish pexpect_code = self.conn.expect([self.prompt, pexpect.TIMEOUT], timeout=self.connect_timeout) # if a timeout occurred, send an empty line and wait for a clean shell if pexpect_code == 1: # send a newline to clear and present the shell self.conn.sendline("") pexpect_code = self.conn.expect(self.prompt) def run_command(self, cmd, timeout=None): """ Runs command on target over the provided serial connection. Returns any output on the shell while the command was run. command: Command to run on target. timeout: : Kill command after seconds. None: Kill command default value seconds. 0: No timeout, runs until return. """ import pexpect # limiting scope to avoid build dependency # Convert from the OETarget defaults to pexpect timeout values if timeout is None: timeout = self.default_command_timeout elif timeout == 0: timeout = None # passing None to pexpect is infinite timeout self.conn.sendline(cmd) pexpect_code = self.conn.expect([self.prompt, pexpect.TIMEOUT], timeout=timeout) # check for timeout if pexpect_code == 1: self.conn.send('\003') # send Ctrl+C self._seek_to_clean_shell() raise SerialTimeoutException(f'Timeout executing: {cmd} after {timeout}s') return self.conn.before.removesuffix('\r\n')