Files
docker-inotify-command/monitor.py
2017-04-02 22:46:48 -04:00

304 lines
12 KiB
Python
Executable File

#!/usr/bin/python3
import datetime
import json
import logging
import os
import re
import subprocess
import sys
import tempfile
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
RUNAS = "/files/runas.sh"
#-----------------------------------------------------------------------------------------------------------------------
def remove_linefeeds(input_filename):
temp = tempfile.NamedTemporaryFile(delete=False)
with open(input_filename, "r") as input_file:
with open(temp.name, "w") as output_file:
for line in input_file:
output_file.write(line)
return temp.name
#-----------------------------------------------------------------------------------------------------------------------
def to_seconds(timestr):
hms = timestr.split(':')
seconds = 0
while hms:
seconds *= 60
seconds += int(hms.pop(0))
return seconds
#-----------------------------------------------------------------------------------------------------------------------
def read_config(config_file):
config_file = remove_linefeeds(config_file)
# Shenanigans to read docker env vars, and the bash format config file. I didn't want to ask them to change their
# config files.
dump_command = '/usr/bin/python3 -c "import os, json;print(json.dumps(dict(os.environ)))"'
pipe = subprocess.Popen(['/bin/bash', '-c', dump_command], stdout=subprocess.PIPE)
string = pipe.stdout.read().decode('ascii')
base_env = json.loads(string)
source_command = 'source {}'.format(config_file)
pipe = subprocess.Popen(['/bin/bash', '-c', 'set -a && {} && {}'.format(source_command,dump_command)],
stdout=subprocess.PIPE)
string = pipe.stdout.read().decode('ascii')
config_env = json.loads(string)
env = config_env.copy()
env.update(base_env)
class Args:
pass
args = Args()
if "WATCH_DIR" not in env:
logging.error("Configuration error. WATCH_DIR must be defined.")
sys.exit(1)
if not os.path.isdir(env["WATCH_DIR"]):
logging.error("Configuration error. WATCH_DIR must be a directory.")
sys.exit(1)
args.watch_dir = env["WATCH_DIR"]
if "SETTLE_DURATION" not in env or not re.match("([0-9]{1,2}:){0,2}[0-9]{1,2}", env["SETTLE_DURATION"]):
logging.error("Configuration error. SETTLE_DURATION must be defined as HH:MM:SS or MM:SS or SS.")
sys.exit(1)
args.settle_duration = to_seconds(env["SETTLE_DURATION"])
if "MAX_WAIT_TIME" not in env or not re.match("([0-9]{1,2}:){0,2}[0-9]{1,2}", env["MAX_WAIT_TIME"]):
logging.error("Configuration error. MAX_WAIT_TIME must be defined as HH:MM:SS or MM:SS or SS.")
sys.exit(1)
args.max_wait_time = to_seconds(env["MAX_WAIT_TIME"])
if args.settle_duration > args.max_wait_time:
logging.error("Configuration error. SETTLE_DURATION cannot be greater than MAX_WAIT_TIME.")
sys.exit(1)
if "MIN_PERIOD" not in env or not re.match("([0-9]{1,2}:){0,2}[0-9]{1,2}", env["MIN_PERIOD"]):
logging.error("Configuration error. MIN_PERIOD must be defined as HH:MM:SS or MM:SS or SS.")
sys.exit(1)
args.min_period = to_seconds(env["MIN_PERIOD"])
if "USER_ID" not in env or not re.match("[0-9]{1,}", env["USER_ID"]):
logging.error("Configuration error. USER_ID must be a whole number.")
sys.exit(1)
args.user_id = env["USER_ID"]
if "GROUP_ID" not in env or not re.match("[0-9]{1,}", env["GROUP_ID"]):
logging.error("Configuration error. GROUP_ID must be a whole number.")
sys.exit(1)
args.group_id = env["GROUP_ID"]
if "COMMAND" not in env:
logging.error("Configuration error. COMMAND must be defined.")
sys.exit(1)
args.command = env["COMMAND"]
if "UMASK" not in env or not re.match("0[0-7]{3}", env["UMASK"]):
logging.error("Configuration error. UMASK must be defined as an octal 0### number.")
sys.exit(1)
args.umask = env["UMASK"]
if "DEBUG" in env and not re.match("[01]", env["DEBUG"]):
logging.error("Configuration error. DEBUG must be defined as 0 or 1.")
sys.exit(1)
args.debug = "DEBUG" in env and env["DEBUG"] == "1"
if "IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING" not in env or not re.match("[01]", env["IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING"]):
logging.error("Configuration error. IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING must be defined as 0 or 1.")
sys.exit(1)
args.ignore_events_while_command_is_running = env["IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING"] == "1"
logging.info("CONFIGURATION:")
logging.info(" WATCH_DIR=%s", args.watch_dir)
logging.info("SETTLE_DURATION=%s", args.settle_duration)
logging.info(" MAX_WAIT_TIME=%s", args.max_wait_time)
logging.info(" MIN_PERIOD=%s", args.min_period)
logging.info(" COMMAND=%s", args.command)
logging.info(" USER_ID=%s", args.user_id)
logging.info(" GROUP_ID=%s", args.group_id)
logging.info(" UMASK=%s", args.umask)
logging.info(" DEBUG=%s", args.debug)
logging.info("IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING=%s", args.ignore_events_while_command_is_running)
return args
#-----------------------------------------------------------------------------------------------------------------------
# This is the main watchdog class. When a new event is detected, the class keeps track of the time since that event was
# detected, as well as the time since any event was detected. After being reset, it starts looking for a new event
# again.
#
# This class runs in parallel with the rest of the program, so we shouldn't be missing any events due to not listening
# at the time.
class ModifyHandler(FileSystemEventHandler):
_detected_event, _detected_time, _last_event_time, _enabled = None, None, None, True
def on_any_event(self, event):
if not self._enabled:
return
# Ignore changes to the watch dir itself. event.src_path doesn't exist for delete events
if os.path.exists(event.src_path) and os.path.samefile(args.watch_dir, event.src_path):
return
self._last_event_time = datetime.datetime.now()
if not self._detected_event:
self._detected_event = event
self._detected_time = self._last_event_time
def enable_monitoring(self, enabled):
self._enabled = enabled
def detected_event(self):
return self._detected_event
def reset(self):
self._detected_event = None
self._detected_time = None
def time_since_detected(self):
return (datetime.datetime.now() - self._detected_time).total_seconds()
def time_since_last_event(self):
return (datetime.datetime.now() - self._last_event_time).total_seconds()
#-----------------------------------------------------------------------------------------------------------------------
def run_command(args, event_handler):
# Reset before, in case IGNORE_EVENTS_WHILE_COMMAND_IS_RUNNING is set, and new events come in while the command is
# running
event_handler.reset()
logging.info("Running command with user ID %s, group ID %s, and umask %s", args.user_id, args.group_id, args.umask)
logging.info("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
event_handler.enable_monitoring(not args.ignore_events_while_command_is_running)
returncode = subprocess.call([RUNAS, args.user_id, args.group_id, args.umask, args.command])
event_handler.enable_monitoring(True)
logging.info("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
logging.info("Finished running command. Exit code was %i", returncode)
#-----------------------------------------------------------------------------------------------------------------------
def wait_for_change(event_handler):
logging.info("Waiting for new change")
while True:
event = event_handler.detected_event()
if event:
logging.info("Detected change to %s %s", "directory" if event.is_directory else "file", event.src_path)
return
time.sleep(.1)
#-----------------------------------------------------------------------------------------------------------------------
def wait_for_events_to_stabilize(settle_duration, max_wait_time, event_handler):
logging.info("Waiting for watch directory to stabilize for %i seconds before triggering command", settle_duration)
while True:
if event_handler.time_since_last_event() >= settle_duration:
logging.info("Watch directory stabilized for %s seconds. Triggering command.", settle_duration)
return
elif event_handler.time_since_detected() >= max_wait_time:
logging.warn("WARNING: Watch directory didn't stabilize for %s seconds. Triggering command anyway.",
max_wait_time)
return
time.sleep(.1)
#-----------------------------------------------------------------------------------------------------------------------
def block_until_min_period(min_period, last_command_run):
seconds_since_last_run = (datetime.datetime.now() - last_command_run).total_seconds()
if seconds_since_last_run >= min_period:
return
logging.info("Command triggered, but it's too soon to run the command again. Waiting another %i seconds",
args.min_period - seconds_since_last_run)
time.sleep(min_period - seconds_since_last_run)
#-----------------------------------------------------------------------------------------------------------------------
config_file = sys.argv[1]
name = os.path.splitext(os.path.basename(config_file))[0]
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] {}: %(message)s'.format(name), datefmt='%Y-%m-%d %H:%M:%S')
args = read_config(config_file)
#args["DEBUG"] = True
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info("Starting monitor for %s", name)
# Launch the watchdog
event_handler = ModifyHandler()
observer = Observer()
observer.schedule(event_handler, args.watch_dir, recursive=True)
observer.start()
try:
# Initialize this to some time in the past
last_command_run = datetime.datetime.now() - datetime.timedelta(seconds=args.min_period+10)
# To help keep myself sane. "waiting for change" -> "waiting to stabilize or time out" -> "command triggered" ->
# "command running" -> "waiting for change". We can also go from "command triggered" -> "waiting to stabilize or
# time out", if new changes are detected while we're waiting for the min_period to expire.
state = "waiting for change"
while True:
# Need to put an "if" on this state because the loop can restart if new changes are detected while waiting for
# min_period to expire.
if state == "waiting for change":
wait_for_change(event_handler)
state = "waiting to stabilize or time out"
wait_for_events_to_stabilize(args.settle_duration, args.max_wait_time, event_handler)
state = "command triggered"
block_until_min_period(args.min_period, last_command_run)
# In case new events came in while we were sleeping. (But skip this if we've already waited our max_wait_time)
if event_handler.time_since_last_event() < args.settle_duration and \
event_handler.time_since_detected() < args.max_wait_time:
logging.info("Detected new changes while waiting.")
state = "waiting to stabilize or time out"
continue
state = "command running"
run_command(args, event_handler)
last_command_run = datetime.datetime.now()
state = "waiting for change"
except KeyboardInterrupt:
observer.stop()
observer.join()
sys.exit(0)