#!/usr/bin/env python from __future__ import print_function import os, sys, re import argparse, time import signal, atexit from subprocess import Popen, STDOUT, PIPE from select import select # Pseudo-TTY and terminal manipulation import pty, array, fcntl, termios IS_PY_3 = sys.version_info[0] == 3 verbose = 0 debug_file = None log_file = None def debug(data): if debug_file: debug_file.write(data) debug_file.flush() def log(data, verbosity=0, end='\n'): if log_file: log_file.write(data + end) log_file.flush() if verbose >= verbosity: print(data, end=end) sys.stdout.flush() def vlog(data, end='\n'): log(data, verbosity=1, end=end) def vvlog(data, end='\n'): log(data, verbosity=2, end=end) sep = "\n" rundir = None parser = argparse.ArgumentParser( description="Run a test file against a Mal implementation") parser.add_argument('-v', '--verbose', action='count', default=0, help="verbose output; repeat to increase verbosity") parser.add_argument('--rundir', help="change to the directory before running tests") parser.add_argument('--start-timeout', default=10, type=int, help="default timeout for initial prompt") parser.add_argument('--test-timeout', default=20, type=int, help="default timeout for each individual test action") parser.add_argument('--pre-eval', default=None, type=str, help="Mal code to evaluate prior to running the test") parser.add_argument('--no-pty', action='store_true', help="Use direct pipes instead of pseudo-tty") parser.add_argument('--log-file', type=str, help="Write messages to the named file in addition the screen") parser.add_argument('--debug-file', type=str, help="Write all test interactions to the named file") parser.add_argument('--hard', action='store_true', help="Turn soft tests (soft, deferrable, optional) into hard failures") parser.add_argument('--continue-after-fail', action='store_true', help="Run all tests in a test file even if there are failures") # Control whether deferrable and optional tests are executed parser.add_argument('--deferrable', dest='deferrable', action='store_true', help="Enable deferrable tests that follow a ';>>> deferrable=True'") parser.add_argument('--no-deferrable', dest='deferrable', action='store_false', help="Disable deferrable tests that follow a ';>>> deferrable=True'") parser.set_defaults(deferrable=True) parser.add_argument('--optional', dest='optional', action='store_true', help="Enable optional tests that follow a ';>>> optional=True'") parser.add_argument('--no-optional', dest='optional', action='store_false', help="Disable optional tests that follow a ';>>> optional=True'") parser.set_defaults(optional=True) parser.add_argument('test_file', type=str, help="a test file formatted as with mal test data") parser.add_argument('mal_cmd', nargs="*", help="Mal implementation command line. Use '--' to " "specify a Mal command line with dashed options.") parser.add_argument('--crlf', dest='crlf', action='store_true', help="Write \\r\\n instead of \\n to the input") class Runner(): def __init__(self, args, no_pty=False, line_break="\n"): #print "args: %s" % repr(args) self.no_pty = no_pty # Cleanup child process on exit atexit.register(self.cleanup) self.p = None env = os.environ env['TERM'] = 'dumb' env['INPUTRC'] = '/dev/null' env['PERL_RL'] = 'false' if no_pty: self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=STDOUT, preexec_fn=os.setsid, env=env) self.stdin = self.p.stdin self.stdout = self.p.stdout else: # provide tty to get 'interactive' readline to work master, slave = pty.openpty() # Set terminal size large so that readline will not send # ANSI/VT escape codes when the lines are long. buf = array.array('h', [100, 200, 0, 0]) fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True) self.p = Popen(args, bufsize=0, stdin=slave, stdout=slave, stderr=STDOUT, preexec_fn=os.setsid, env=env) # Now close slave so that we will get an exception from # read when the child exits early # http://stackoverflow.com/questions/11165521 os.close(slave) self.stdin = os.fdopen(master, 'r+b', 0) self.stdout = self.stdin #print "started" self.buf = "" self.last_prompt = "" self.line_break = line_break def read_to_prompt(self, prompts, timeout): end_time = time.time() + timeout while time.time() < end_time: [outs,_,_] = select([self.stdout], [], [], 1) if self.stdout in outs: new_data = self.stdout.read(1) new_data = new_data.decode("latin1") if IS_PY_3 else new_data #print("new_data: %s" % repr(new_data)) debug(new_data) # Perform newline cleanup self.buf += new_data.replace("\r", "") if self.buf.endswith('\x1b[6n'): vvlog("Handling ASCII cursor query") self.stdin.write(b"\x1b[1;1R") self.buf = "" continue for prompt in prompts: regexp = re.compile(prompt) match = regexp.search(self.buf) if match: end = match.end() buf = self.buf[0:match.start()] self.buf = self.buf[end:] self.last_prompt = prompt return buf return None def writeline(self, str): def _to_bytes(s): return bytes(s, "latin1") if IS_PY_3 else s data = _to_bytes(str.replace('\r', '\x16\r') + self.line_break) #print("write: %s" % repr(data)) self.stdin.write(data) def cleanup(self): #print "cleaning up" if self.p: try: os.killpg(self.p.pid, signal.SIGTERM) except OSError: pass self.p = None class TestReader: def __init__(self, test_file): self.line_num = 0 f = open(test_file, newline='') if IS_PY_3 else open(test_file) self.data = f.read().split('\n') self.soft = False self.deferrable = False self.optional = False def next(self): self.msg = None self.form = None self.out = "" self.ret = None while self.data: self.line_num += 1 line = self.data.pop(0) if re.match(r"^\s*$", line): # blank line continue elif line[0:3] == ";;;": # ignore comment continue elif line[0:2] == ";;": # output comment self.msg = line[3:] return True elif line[0:5] == ";>>> ": # settings/commands settings = {} exec(line[5:], {}, settings) if 'soft' in settings: self.soft = settings['soft'] if 'deferrable' in settings and settings['deferrable']: self.deferrable = "\nSkipping deferrable and optional tests" return True if 'optional' in settings and settings['optional']: self.optional = "\nSkipping optional tests" return True continue elif line[0:1] == ";": # unexpected comment raise Exception("Test data error at line %d:\n%s" % (self.line_num, line)) self.form = line # the line is a form to send # Now find the output and return value while self.data: line = self.data[0] if line[0:3] == ";=>": self.ret = line[3:] self.line_num += 1 self.data.pop(0) break elif line[0:2] == ";/": self.out = self.out + line[2:] + sep self.line_num += 1 self.data.pop(0) else: self.ret = "" break if self.ret != None: break if self.out[-1:] == sep and not self.ret: # If there is no return value, output should not end in # separator self.out = self.out[0:-1] return self.form args = parser.parse_args(sys.argv[1:]) verbose = args.verbose # Workaround argparse issue with two '--' on command line if sys.argv.count('--') > 0: args.mal_cmd = sys.argv[sys.argv.index('--')+1:] if args.rundir: os.chdir(args.rundir) if args.log_file: log_file = open(args.log_file, "a") if args.debug_file: debug_file = open(args.debug_file, "a") r = Runner(args.mal_cmd, no_pty=args.no_pty, line_break="\r\n" if args.crlf else "\n") t = TestReader(args.test_file) def assert_prompt(runner, prompts, timeout): # Wait for the initial prompt header = runner.read_to_prompt(prompts, timeout=timeout) if not header == None: if header: vvlog("Started with:\n%s" % header) else: log("Did not receive one of following prompt(s): %s" % repr(prompts)) log(" Got : %s" % repr(r.buf)) sys.exit(1) def elide(s, max = 79): """Replace middle of a long string with '...' so length is <= max.""" return s if len(s) <= max else s[:(max-3)//2] + "..." + s[-((max-2)//2):] # Wait for the initial prompt try: assert_prompt(r, ['[^\\s()<>]+> '], args.start_timeout) except: _, exc, _ = sys.exc_info() log("\nException: %s" % repr(exc)) log("Output before exception:\n%s" % r.buf) sys.exit(1) # Send the pre-eval code if any if args.pre_eval: sys.stdout.write("RUNNING pre-eval: %s" % args.pre_eval) r.writeline(args.pre_eval) assert_prompt(r, ['[^\\s()<>]+> '], args.test_timeout) total_test_cnt = 0 test_cnt = 0 pass_cnt = 0 fail_cnt = 0 soft_fail_cnt = 0 failures = [] fail_type = "" class TestTimeout(Exception): pass while t.next(): if args.deferrable == False and t.deferrable: log(t.deferrable) break if args.optional == False and t.optional: log(t.optional) break if t.msg != None: # omit blank test lines unless verbose if verbose or t.msg: log(t.msg) continue if t.form == None: continue total_test_cnt += 1 if fail_type == "TIMED OUT": continue # repl is stuck if not args.continue_after_fail: if fail_cnt > 0: continue # The repeated form is to get around an occasional OS X issue # where the form is repeated. # https://github.com/kanaka/mal/issues/30 expects = [".*%s%s%s" % (sep, t.out, re.escape(t.ret)), ".*%s.*%s%s%s" % (sep, sep, t.out, re.escape(t.ret))] test_msg = "TEST (line %d): %s -> %s" % ( t.line_num, repr(t.form), repr(expects[0])) vlog(test_msg, end='') r.writeline(t.form) try: test_cnt += 1 res = r.read_to_prompt(['\r\n[^\\s()<>]+> ', '\n[^\\s()<>]+> '], timeout=args.test_timeout) #print "%s,%s,%s" % (idx, repr(p.before), repr(p.after)) if (t.ret == "" and t.out == ""): vlog(" -> SUCCESS (result ignored)") pass_cnt += 1 elif res and (re.search(expects[0], res, re.S) or re.search(expects[1], res, re.S)): vlog(" -> SUCCESS") pass_cnt += 1 else: if (res == None): if verbose == 0: log(test_msg, end='') log(" -> TIMED OUT") fail_cnt += 1 fail_type = "TIMED OUT" elif t.soft and not args.hard: vlog(" -> SOFT FAIL:") soft_fail_cnt += 1 fail_type = "SOFT FAILED" else: vlog(" -> FAIL:") fail_cnt += 1 fail_type = "FAILED" expected = " Expected : %s" % repr(expects[0]) got = " Got : %s" % repr(res or "") vvlog(expected) vlog(got if verbose >= 2 else elide(got)) failed_test = "%s %s:\n%s\n%s" % ( fail_type, test_msg, expected, got) failures.append(failed_test) except: _, exc, _ = sys.exc_info() log("\nException: %s" % repr(exc)) log("Output before exception:\n%s" % r.buf) break if len(failures) > 0: log("\nFAILURES:") for f in failures: log(f) results = """ TEST RESULTS (for %s): %3d: soft failing tests %3d: failing tests %3d: passing tests %3d: executed tests %3d: total tests in the file (%d skipped) """ % (args.test_file, soft_fail_cnt, fail_cnt, pass_cnt, test_cnt, total_test_cnt, total_test_cnt - test_cnt) log(results) debug("\n") # add some separate to debug log if fail_cnt > 0: sys.exit(1) sys.exit(0)