mega-test.py 3.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
#! /usr/bin/env python

# MEGA TEST
#
# usage:  mega-test.py <config>
#
# This runs several tests in parallel and shows progress bars for each, based on a config file.
#
# <config> is a file containing a list of commands to run along with the expected number of lines
# they will output (to stdout and stderr combined), which is how the progress bar is calculated.
# The format of the file is simply one test per line, with the line containing the test name,
# the number of output lines expected, and the test command.  Example:
#
#     mytest 1523 ./my-test --foo bar
#     another 862 ./another-test --baz
#
# Each command is interpreted by `sh -euc`, therefore it is acceptable to use environment
# variables and other shell syntax.
#
# After all tests complete, the config file will be rewritten to update the line counts to the
# actual number of lines seen for all passing tests (failing tests are not updated).

import sys
import re
import os
from errno import EAGAIN
from fcntl import fcntl, F_GETFL, F_SETFL
from select import poll, POLLIN, POLLHUP
from subprocess import Popen, PIPE, STDOUT

CONFIG_LINE = re.compile("^([^ ]+) +([0-9]+) +(.*)$")

if len(sys.argv) != 2:
  sys.stderr.write("Wrong number of arguments.\n");
  sys.exit(1)

if not os.access("/tmp/test-output", os.F_OK):
  os.mkdir("/tmp/test-output")

config = open(sys.argv[1], 'r')

tests = []

class Test:
  def __init__(self, name, command, lines):
    self.name = name
    self.command = command
    self.lines = lines
    self.count = 0
    self.done = False

  def start(self, poller):
    self.proc = Popen(["sh", "-euc", test.command], stdin=dev_null, stdout=PIPE, stderr=STDOUT)
    fd = self.proc.stdout.fileno()
    flags = fcntl(fd, F_GETFL)
    fcntl(fd, F_SETFL, flags | os.O_NONBLOCK)
    poller.register(self.proc.stdout, POLLIN)
    self.log = open("/tmp/test-output/" + self.name + ".log", "w")

  def update(self):
    try:
      while True:
        text = self.proc.stdout.read()
        if text == "":
          self.proc.wait()
          self.done = True
          self.log.close()
          return True
        self.count += text.count("\n")
        self.log.write(text)
    except IOError as e:
      if e.errno == EAGAIN:
        return False
      raise

  def print_bar(self):
    percent = self.count * 100 / self.lines
    status = "(%3d%%)" % percent

    color_on = ""
    color_off = ""

    if self.done:
      if self.proc.returncode == 0:
        color_on = "\033[0;32m"
        status = "PASS"
      else:
        color_on = "\033[0;31m"
        status = "FAIL: /tmp/test-output/%s.log" % self.name
      color_off = "\033[0m"

    print "%s%-16s |%-25s| %6d/%6d %s%s    " % (
        color_on, self.name, '=' * min(percent / 4, 25), self.count, self.lines, status, color_off)

  def passed(self):
    return self.proc.returncode == 0

for line in config:
  if len(line) > 0 and not line.startswith("#"):
    match = CONFIG_LINE.match(line)
    if not match:
      sys.stderr.write("Invalid config syntax: %s\n" % line);
      sys.exit(1)
    test = Test(match.group(1), match.group(3), int(match.group(2)))
    tests.append(test)

config.close()

dev_null = open("/dev/null", "rw")
poller = poll()
fd_map = {}

for test in tests:
  test.start(poller)
  fd_map[test.proc.stdout.fileno()] = test

active_count = len(tests)

def print_bars():
  for test in tests:
    test.print_bar()

print_bars()

while active_count > 0:
  for (fd, event) in poller.poll():
    if fd_map[fd].update():
      active_count -= 1
      poller.unregister(fd)
  sys.stdout.write("\033[%dA\r" % len(tests))
  print_bars()

new_config = open(sys.argv[1], "w")
for test in tests:
  if test.passed():
    new_config.write("%-16s %6d %s\n" % (test.name, test.count, test.command))
  else:
    new_config.write("%-16s %6d %s\n" % (test.name, test.lines, test.command))

for test in tests:
  if not test.passed():
    sys.exit(1)

sys.exit(0)