209 lines
6.8 KiB
Python

import itertools
import logging
import os
import re
import subprocess
import threading
import time
TIMEOUT = int(os.getenv("TIMEOUT", 20))
EXECUTOR_WORKERS = int(os.getenv("EXECUTOR_WORKERS", 20))
VERBOSE = os.getenv("VERBOSE", "0") == "1"
LOG_LEVEL = os.getenv("LOG_LEVEL", "debug")
assert LOG_LEVEL in ["trace", "debug", "info", "warn", "error"]
DEFAULT_MS_PATH = os.path.join(
os.path.dirname(__file__), "..", "..", "target/debug/minisafed"
)
MINISAFED_PATH = os.getenv("MINISAFED_PATH", DEFAULT_MS_PATH)
DEFAULT_BITCOIND_PATH = "bitcoind"
BITCOIND_PATH = os.getenv("BITCOIND_PATH", DEFAULT_BITCOIND_PATH)
COIN = 10 ** 8
def wait_for(success, timeout=TIMEOUT, debug_fn=None):
"""
Run success() either until it returns True, or until the timeout is reached.
debug_fn is logged at each call to success, it can be useful for debugging
when tests fail.
"""
start_time = time.time()
interval = 0.25
while not success() and time.time() < start_time + timeout:
if debug_fn is not None:
logging.info(debug_fn())
time.sleep(interval)
interval *= 2
if interval > 5:
interval = 5
if time.time() > start_time + timeout:
raise ValueError("Error waiting for {}", success)
class RpcError(ValueError):
def __init__(self, method: str, payload: dict, error: str):
super(ValueError, self).__init__(
"RPC call failed: method: {}, payload: {}, error: {}".format(
method, payload, error
)
)
self.method = method
self.payload = payload
self.error = error
class TailableProc(object):
"""A monitorable process that we can start, stop and tail.
This is the base class for the daemons. It allows us to directly
tail the processes and react to their output.
"""
def __init__(self, outputDir=None, verbose=True):
self.logs = []
self.logs_cond = threading.Condition(threading.RLock())
self.env = os.environ.copy()
self.running = False
self.proc = None
self.outputDir = outputDir
self.logsearch_start = 0
# Set by inherited classes
self.cmd_line = []
self.prefix = ""
# Should we be logging lines we read from stdout?
self.verbose = verbose
# A filter function that'll tell us whether to filter out the line (not
# pass it to the log matcher and not print it to stdout).
self.log_filter = lambda _: False
def start(self, stdin=None, stdout=None, stderr=None):
"""Start the underlying process and start monitoring it."""
logging.debug("Starting '%s'", " ".join(self.cmd_line))
self.proc = subprocess.Popen(
self.cmd_line,
stdin=stdin,
stdout=stdout if stdout else subprocess.PIPE,
stderr=stderr if stderr else subprocess.PIPE,
env=self.env,
)
self.thread = threading.Thread(target=self.tail)
self.thread.daemon = True
self.thread.start()
self.running = True
def save_log(self):
if self.outputDir:
logpath = os.path.join(self.outputDir, "log")
with open(logpath, "w") as f:
for l in self.logs:
f.write(l + "\n")
def stop(self, timeout=10):
self.save_log()
self.proc.terminate()
# Now give it some time to react to the signal
rc = self.proc.wait(timeout)
if rc is None:
self.proc.kill()
self.proc.wait()
self.thread.join()
return self.proc.returncode
def kill(self):
"""Kill process without giving it warning."""
self.proc.kill()
self.proc.wait()
self.thread.join()
def tail(self):
"""Tail the stdout of the process and remember it.
Stores the lines of output produced by the process in
self.logs and signals that a new line was read so that it can
be picked up by consumers.
"""
out = self.proc.stdout.readline
err = self.proc.stderr.readline
for line in itertools.chain(iter(out, ""), iter(err, "")):
if len(line) == 0:
break
if self.log_filter(line.decode("utf-8")):
continue
if self.verbose:
logging.debug(f"{self.prefix}: {line.decode().rstrip()}")
with self.logs_cond:
self.logs.append(str(line.rstrip()))
self.logs_cond.notifyAll()
self.running = False
self.proc.stdout.close()
self.proc.stderr.close()
def is_in_log(self, regex, start=0):
"""Look for `regex` in the logs."""
ex = re.compile(regex)
for l in self.logs[start:]:
if ex.search(l):
logging.debug("Found '%s' in logs", regex)
return l
logging.debug(f"{self.prefix} : Did not find {regex} in logs")
return None
def wait_for_logs(self, regexs, timeout=TIMEOUT):
"""Look for `regexs` in the logs.
We tail the stdout of the process and look for each regex in `regexs`,
starting from last of the previous waited-for log entries (if any). We
fail if the timeout is exceeded or if the underlying process
exits before all the `regexs` were found.
If timeout is None, no time-out is applied.
"""
logging.debug("Waiting for {} in the logs".format(regexs))
exs = [re.compile(r) for r in regexs]
start_time = time.time()
pos = self.logsearch_start
while True:
if timeout is not None and time.time() > start_time + timeout:
print("Time-out: can't find {} in logs".format(exs))
for r in exs:
if self.is_in_log(r):
print("({} was previously in logs!)".format(r))
raise TimeoutError('Unable to find "{}" in logs.'.format(exs))
with self.logs_cond:
if pos >= len(self.logs):
if not self.running:
raise ValueError("Process died while waiting for logs")
self.logs_cond.wait(1)
continue
for r in exs.copy():
self.logsearch_start = pos + 1
if r.search(self.logs[pos]):
logging.debug("Found '%s' in logs", r)
exs.remove(r)
break
if len(exs) == 0:
return self.logs[pos]
pos += 1
def wait_for_log(self, regex, timeout=TIMEOUT):
"""Look for `regex` in the logs.
Convenience wrapper for the common case of only seeking a single entry.
"""
return self.wait_for_logs([regex], timeout)