import bs4 import itertools import time import requests import configparser import urllib.parse import mechanize import pushover import datetime import os import logging #logging.basicConfig(level=getattr(logging, os.environ.get("LOG_LEVEL", "INFO"))) logging.basicConfig(level=logging.DEBUG) _logger = logging.getLogger(__name__) def split_notify(string): values = string.split(":") return (values[0], values[1:]) def main(): try: pushover.init(os.environ["PUSHOVER_KEY"]) except: _logger.exception("Pushover KEY is required") return while True: try: _logger.info("Starting Bibcheck") if "HEALTHCHECK_URL" in os.environ: _logger.info("Pinging Healthcheck [start]") _logger.debug( f"Pinging Healthcheck Url {os.environ['HEALTHCHECK_URL']}/start" ) requests.get(f"{os.environ['HEALTHCHECK_URL']}/start") allinfo = [] users = list( map(lambda x: x.split(":", 1), os.environ["BIB_USERS"].split(",")) ) notify = dict(map(split_notify, os.environ["NOTIFY_USERS"].split(","))) _logger.info(f"Found {len(users)} user(s) to check for") for user, pwd in users: _logger.info(f"Check for {user}") allinfo += check(user, pwd, notify.get(user, [])) if "HEALTHCHECK_URL" in os.environ: _logger.info("Pinging Healthcheck [stop]") requests.post( f"{os.environ['HEALTHCHECK_URL']}", data="\n".join(allinfo).encode("utf8"), ) if "RUN_FOREVER" in os.environ and os.environ["RUN_FOREVER"] == "False": _logger.info("Finished") break except Exception as e: _logger.exception("Check Failed ") if "HEALTHCHECK_URL" in os.environ: _logger.info("Pinging Healthcheck [failed]") _logger.debug( f"Pinging Healthcheck Url {os.environ['HEALTHCHECK_URL']}/fail" ) requests.post( f"{os.environ['HEALTHCHECK_URL']}/fail", data="\n".join(allinfo).encode("utf8"), ) now = datetime.datetime.utcnow to = (now() + datetime.timedelta(days=1)).replace( hour=os.environ.get("RUN_AT_HOUR", 6), minute=os.environ.get("RUN_AT_MINUTE", 0), second=0, ) _logger.info(f"Sleeping till {to.isoformat()}") time.sleep((to - now()).seconds) def check(username, password, notify_ids): br = mechanize.Browser() starturl = os.environ.get( "LIBRARY_URL", "https://ssl.muenchen.de/aDISWeb/app?service=direct/0/Home/$DirectLink&sp=SOPAC&sp=SBK00000000", ) _logger.info(f"Library URL used: {starturl}") _logger.info(f"Goto loginpage") response = br.open(starturl) _logger.info(f"Fill form") br.select_form("Form0") br["$Textfield"] = username br["$Textfield$0"] = password _logger.info(f"Sumbit form") response = br.submit(name='textButton') print (b'Matthias' in response.read()) currenturl = response.geturl() cookies = br.cookiejar # _logger.info(f"Open account page using form0") #br.select_form(nr=0) #br.set_all_readonly(False) #br["$ScriptButton_hidden"] = 'BK' br.set_debug_http(True) # response = br.submit() try: _logger.info(f"Open lent list") req = mechanize.Request(currenturl + '?service=direct/1/POOLSTPM@@@@@@@@_4400E200_3B328A80/Tabelle_Z1LP01.cellInternalLink.directlink&sp=SRGLINK_5&sp=SZA&requestCount=1',method='GET',headers={'Referer': currenturl}) cookies.add_cookie_header(req) response = br.open(req) print(response.read()) _logger.info(f"Extend all") br.select_form("Form0") response = br.submit(name="textButton$0", label="Alle verlängern") lentlist = bs4.BeautifulSoup(response.read(), "html.parser") table = lentlist.select('table[class="rTable_table"]')[0] allinfo = [] _logger.info(f"Parsing table") for entry in table.tbody.select("tr"): info = list(map(lambda x: str(x.text).strip(), entry.select("td"))) date = datetime.datetime.strptime(info[1], "%d.%m.%Y") delta = date - datetime.datetime.now() allinfo.append(str(info)) if delta.days <= 10 or delta.days == 20 or delta.days == 15: message = f"Bitte an {info[3]} denken\n" if delta.days <= 7: message += '' message += f"Abgabe {info[1]}" if delta.days <= 7: message += "" message += f" - {username}" for client in itertools.chain( notify_ids, os.environ.get("PUSHOVER_CLIENTS", "").split(",") ): try: pushover.Client(client).send_message( message, title="Erinnerung", html=1 ) except: print("No client") except (StopIteration, mechanize._mechanize.LinkNotFoundError) as e: _logger.exception("Failed to read information") return [] return allinfo if __name__ == "__main__": main()