added comments

This commit is contained in:
2019-05-04 19:13:21 +02:00
parent 9e7db053ae
commit d884e61da4
5 changed files with 184 additions and 139 deletions

View File

@@ -3,23 +3,32 @@ import os
_config = None
def load():
def _set_defaults(config):
config.add_section('pushover')
config.add_section('general')
config.add_section('smtp')
config['pushover']['apikey'] = ''
config['general']['secretkey'] = ''
config['general']['baseurl'] = ''
config['general']['adminmail'] = ''
config['smtp']['server'] = ''
config['smtp']['username'] = ''
config['smtp']['password'] = ''
def load(cfg_file='informentor.ini'):
'''Load the config from the file'''
global _config
if _config is None:
_config = configparser.ConfigParser()
if not os.path.isfile('infomentor.ini'):
_config.add_section('pushover')
_config.add_section('general')
_config.add_section('smtp')
_config['pushover']['apikey'] = ''
_config['general']['secretkey'] = ''
_config['general']['baseurl'] = ''
_config['smtp']['server'] = ''
_config['smtp']['username'] = ''
_config['smtp']['password'] = ''
with open('infomentor.ini', 'w+') as f:
_config.write(f)
_config.read('infomentor.ini')
if not os.path.isfile(cfg_file):
_set_defaults(_config)
save(cfg_file)
_config.read(cfg_file)
return _config
def save(cfg_file='informentor.ini'):
'''Write config to file'''
global _config
with open(cfg_file, 'w+') as f:
_config.write(f)

View File

@@ -23,10 +23,12 @@ class InfomentorFile(object):
@property
def targetfile(self):
'''Get the files output path'''
return os.path.join(self.directory, self.fullfilename)
@property
def targetdir(self):
'''Get the files output directory'''
return os.path.join(self.directory, self.randomid)
@property
@@ -36,6 +38,7 @@ class InfomentorFile(object):
return os.path.join(self.randomid, self.filename)
def save_file(self, content):
'''Write file to the registered path'''
os.makedirs(self.targetdir, exist_ok=True)
with open(self.targetfile, 'wb+') as f:
f.write(content)
@@ -136,6 +139,7 @@ class Infomentor(object):
)
def _get_hidden_fields(self):
'''Extracts key/value elements from hidden fields'''
hiddenfields = self._extract_hidden_fields()
field_values = {}
for f in hiddenfields:
@@ -151,14 +155,15 @@ class Infomentor(object):
return field_values
def _extract_hidden_fields(self):
'''Extracts all the hidden fields from a infomentor login page'''
hidden_re = '<input type="hidden"(.*?) />'
hiddenfields = re.findall(hidden_re, self._last_result.text)
return hiddenfields
def _finalize_login(self):
'''The final login step to get the cookie'''
# Read the oauth token which is the final token for the login
oauth_token = self._get_auth_token()
# authenticate
self._do_post(
self._im1_url('mentor/'),
data={'oauth_token': oauth_token}
@@ -243,27 +248,26 @@ class Infomentor(object):
return file.fullfilename
def _build_url(self, path='', base=BASE_IM1):
'''Builds a general infomentor (IM1) url'''
return '{}/{}'.format(base, path)
def _mim_url(self, path=''):
'''Builds a general mein.infomentor (MIM) url'''
return self._build_url(path, base=self.BASE_MIM)
def _im1_url(self, path=''):
'''Builds a general infomentor (IM1) url'''
return self._build_url(path, base=self.BASE_IM1)
def get_news_list(self):
'''Fetches the list of news'''
self.logger.info('fetching news')
self._do_post(self._mim_url('News/news/GetArticleList'))
news_json = self.get_json_return()
return [str(i['id']) for i in news_json['items']]
def parse_news(self, news_json):
idlist = [str(i['id']) for i in im_news['items']]
self.logger.info('Parsing %d news (%s)', im_news['totalItems'], ', '.join(idlist))
for news_item in reversed(im_news['items']):
newsdata = self.im.get_article(news_item['id'])
def get_news_article(self, id):
'''Receive all the article information'''
article_json = self.get_article(id)
storenewsdata = {
k: article_json[k] for k in ('title', 'content', 'date')
@@ -285,6 +289,7 @@ class Infomentor(object):
return news
def get_article(self, id):
'''Receive the article details'''
self.logger.info('fetching article: %s', id)
self._do_post(
self._mim_url('News/news/GetArticle'),
@@ -293,12 +298,14 @@ class Infomentor(object):
return self.get_json_return()
def get_newsimage(self, id):
'''Fetches the image to a corresponding news entry'''
self.logger.info('fetching article image: %s', id)
filename = '{}.image'.format(id)
url = 'News/NewsImage/GetImage?id={}'.format(id)
return self.download_file(url, directory='images', filename=filename)
def get_calendar(self, offset=0, weeks=1):
'''Fetches a list of calendar entries'''
self.logger.info('fetching calendar')
utcoffset = self._get_utc_offset()
data = {
@@ -316,6 +323,7 @@ class Infomentor(object):
return self.get_json_return()
def get_event(self, eventid):
'''Request the event details from the server'''
self.logger.info('fetching calendar entry')
data = {'id': eventid}
self._do_post(
@@ -325,6 +333,7 @@ class Infomentor(object):
return self.get_json_return()
def get_homework(self, offset=0):
'''Receives a list of homework for the week'''
self.logger.info('fetching homework')
startofweek = self._get_start_of_week(offset)
timestamp = startofweek.strftime('%Y-%m-%dT00:00:00.000Z')
@@ -339,6 +348,7 @@ class Infomentor(object):
return self.get_json_return()
def get_homework_list(self):
'''Receives a list of homework'''
self._homework = {}
homeworklist = []
homework = []
@@ -382,6 +392,7 @@ class Infomentor(object):
return self.get_json_return()
def get_json_return(self):
'''Read the json from the result or write the response to the log'''
try:
return self._last_result.json()
except json.JSONDecodeError as jse:
@@ -391,6 +402,7 @@ class Infomentor(object):
raise
def _get_week_dates(self, offset=0, weeks=1):
'''Convert the current week, an offset and the timespan in weeks to start and end days'''
weekoffset = datetime.timedelta(days=7*offset)
startofweek = self._get_start_of_week()
@@ -409,11 +421,13 @@ class Infomentor(object):
return data
def _get_utc_offset(self):
'''Calculate the UTCoffset'''
now = datetime.datetime.now()
utctime = datetime.datetime.utcnow()
return (now.hour - utctime.hour)*60
def _get_start_of_week(self, offset=0):
'''Get the start of the current + offset week'''
now = datetime.datetime.now()
dayofweek = now.weekday()
startofweek = now - datetime.timedelta(days=dayofweek)

View File

@@ -5,6 +5,7 @@ from sqlalchemy.orm import sessionmaker
_session = None
def get_db(filename='infomentor.db'):
'''Get the database session for infomentor'''
global _session
if _session is None:
engine = create_engine(f'sqlite:///{filename}')

View File

@@ -1,12 +1,15 @@
import os
class flock(object):
'''A simple filelocking mechanism to prevent execution at the same time'''
filename = '.im.lock'
def __init__(self):
'''Creates an object with the current pid'''
self.pid = os.getpid()
def aquire(self):
'''Try to get the lock, if it fails it returns False'''
if self.is_locked():
return False
with open(self.filename, 'w+') as f:
@@ -14,23 +17,28 @@ class flock(object):
return True
def release(self):
'''Release the lock'''
if self.own_lock():
os.unlink(self.filename)
def __del__(self):
'''Release on delete'''
self.release()
def own_lock(self):
'''Check if the lock is assigned to the current pid'''
lockinfo = self._get_lockinfo()
return lockinfo == self.pid
def is_locked(self):
'''Check if it is currently locked'''
lockinfo = self._get_lockinfo()
if not lockinfo:
return False
return self._is_process_active(lockinfo)
def _is_process_active(self, pid):
'''Check if the processed having the lock is still running'''
try:
os.kill(pid, 0)
return pid != self.pid
@@ -38,6 +46,7 @@ class flock(object):
return False
def _get_lockinfo(self):
'''Retrieve the information about the lock'''
try:
lock = {}
with open(self.filename, 'r') as f:

View File

@@ -8,50 +8,61 @@ import datetime
import math
import pushover
from icalendar import Event, vDate, Calendar
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import smtplib
cfg = config.load()
pushover.init(cfg['pushover']['apikey'] )
pushover.init(cfg["pushover"]["apikey"])
class Informer(object):
'''The Logic part of the infomentor notifier.
"""The Logic part of the infomentor notifier.
This class offers the methods required to notify a user of new News and Homework items posted on infomentor.'''
This class offers the methods required to notify a user of new News and Homework items posted on infomentor."""
def __init__(self, user, im, logger):
self.logger = logger or logging.getLogger(__name__)
self.user = user
self.im = im
def send_status_update(self, text):
'''In case something unexpected happends and the user has activated the feature to get notified about it, this will send out the information'''
"""In case something unexpected happends and the user has activated the feature to get notified about it, this will send out the information"""
try:
if self.user.notification.ntype == model.Notification.Types.PUSHOVER:
pushover.Client(self.user.notification.info).send_message(
text,
title='Status Infomentor',
html=False,
text, title="Status Infomentor", html=False
)
elif self.user.notification.ntype == model.Notification.Types.EMAIL:
mail = MIMEText(text)
mail['Subject'] = f'Status Infomentor'
mail['From'] = 'infomentor@09a.de'
mail['To'] = self.user.notification.info
self._send_mail(mail)
self._send_text_mail(
self.user.notification.info, "Status Infomentor", text
)
except:
mail = MIMEText("Fehler bei Infomentor")
mail['Subject'] = f'Fehler bei infomentor'
mail['From'] = 'infomentor@09a.de'
mail['To'] = 'matthias@bilger.info'
self._send_mail(mail)
self._send_text_mail(
cfg["general"]["adminmail"],
"Fehler bei infomentor",
"Fehler bei Infomentor",
)
def update_news(self):
session = db.get_db()
newslist = self.im.get_news_list()
for newsid in newslist:
news = session.query(model.News).filter(model.News.news_id == newsid).with_parent(self.user, 'news').one_or_none()
if news is None:
news = self.im.get_news_article(newsid)
self._notify_news(news)
self.user.news.append(news)
session.commit()
news = (
session.query(model.News)
.filter(model.News.news_id == newsid)
.with_parent(self.user, "news")
.one_or_none()
)
if news is not None:
continue
news = self.im.get_news_article(newsid)
self._notify_news(news)
self.user.news.append(news)
session.commit()
def _notify_news(self, news):
if self.user.notification.ntype == model.Notification.Types.PUSHOVER:
@@ -59,17 +70,23 @@ class Informer(object):
elif self.user.notification.ntype == model.Notification.Types.EMAIL:
self._notify_news_mail(news)
elif self.user.notification.ntype == model.Notification.Types.FAKE:
with open('{}.txt'.format(self.user.name), 'a+') as f:
f.write('Notification:\n---------8<-------\n{}\n---------8<-------\n\n'.format(news.content))
with open("{}.txt".format(self.user.name), "a+") as f:
f.write(
"Notification:\n---------8<-------\n{}\n---------8<-------\n\n".format(
news.content
)
)
else:
raise Exception('invalid notification')
raise Exception("invalid notification")
pass
def _notify_news_pushover(self, news):
text = news.content
for attachment in news.attachments:
fid, fname = attachment.localpath.split('/')
text += '''<br>Attachment {0}: {2}/{1} <br>'''.format(fname, attachment.localpath, cfg['general']['baseurl'])
fid, fname = attachment.localpath.split("/")
text += """<br>Attachment {0}: {2}/{1} <br>""".format(
fname, attachment.localpath, cfg["general"]["baseurl"]
)
parsed_date = dateparser.parse(news.date)
now = datetime.datetime.now()
parsed_date += datetime.timedelta(hours=now.hour, minutes=now.minute)
@@ -77,73 +94,54 @@ class Informer(object):
if len(text) > 900:
url = self._make_site(text)
shorttext = text[:900]
text = '{}...\n\nfulltext saved at: {}'.format(shorttext, url)
text = text.replace('<br>', '\n')
text = "{}...\n\nfulltext saved at: {}".format(shorttext, url)
text = text.replace("<br>", "\n")
try:
self.logger.info(text)
self.logger.info(news.title)
if news.imagefile is not None:
image = open(os.path.join('images', news.imagefile), 'rb')
image = open(os.path.join("images", news.imagefile), "rb")
else:
image = None
pushover.Client(self.user.notification.info).send_message(
text,
title=news.title,
attachment=image,
html=True,
timestamp=timestamp
text, title=news.title, attachment=image, html=True, timestamp=timestamp
)
except pushover.RequestError as e:
self.logger.error('Sending notification failed', exc_info=e)
self.logger.error("Sending notification failed", exc_info=e)
finally:
if image is not None:
image.close()
def _make_site(self, text):
filename = str(uuid.uuid4())
fpath = os.path.join('files', filename+'.html')
fpath = os.path.join("files", filename + ".html")
urlfinder = re.compile("(https?://[^ \n\t]*)")
text = urlfinder.sub(r'<a href="\1">\1</a>', text)
text = '<html> <head> <meta charset="utf-8" /> </head> <body>{}</body></html>'.format(text)
with open(fpath, 'w+') as f:
text = '<html> <head> <meta charset="utf-8" /> </head> <body>{}</body></html>'.format(
text
)
with open(fpath, "w+") as f:
f.write(text)
return '{}/{}.html'.format(cfg['general']['baseurl'], filename)
return "{}/{}.html".format(cfg["general"]["baseurl"], filename)
def _notify_news_mail(self, news):
# Import the email modules we'll need
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import smtplib
outer = MIMEMultipart()
text = news.content.replace('<br>', '\n')
outer.attach(MIMEText(text + '\n\n'))
outer['Subject'] = f'INFOMENTOR News: {news.title}'
outer['From'] = 'infomentor@09a.de'
outer['To'] = self.user.notification.info
for attachment in news.attachments:
fid, fname = attachment.localpath.split('/')
filename = os.path.join('files', attachment.localpath)
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
with open(filename, 'rb') as fp:
msg = MIMEBase(maintype, subtype)
msg.set_payload(fp.read())
encoders.encode_base64(msg)
msg.add_header('Content-Disposition', 'attachment', filename=fname)
outer.attach(msg)
self._send_mail(outer)
self._send_attachment_mail(
news.content,
f"INFOMENTOR Homework: {news.title}",
news.attachments,
self.user.notification.info,
)
def update_homework(self):
session = db.get_db()
homeworklist = self.im.get_homework_list()
for homeworkid in homeworklist:
homework = session.query(model.Homework).filter(model.Homework.homework_id == homeworkid).with_parent(self.user, 'homeworks').one_or_none()
homework = (
session.query(model.Homework)
.filter(model.Homework.homework_id == homeworkid)
.with_parent(self.user, "homeworks")
.one_or_none()
)
if homework is None:
homework = self.im.get_homework_info(homeworkid)
self._notify_hw(homework)
@@ -156,75 +154,89 @@ class Informer(object):
elif self.user.notification.ntype == model.Notification.Types.EMAIL:
self._notify_hw_mail(hw)
elif self.user.notification.ntype == model.Notification.Types.FAKE:
with open('{}.txt'.format(self.user.name), 'a+') as f:
f.write('Notification:\n---------8<-------\n{}\n---------8<-------\n\n'.format(hw.text))
with open("{}.txt".format(self.user.name), "a+") as f:
f.write(
"Notification:\n---------8<-------\n{}\n---------8<-------\n\n".format(
hw.text
)
)
else:
raise Exception('invalid notification')
raise Exception("invalid notification")
pass
def _notify_hw_pushover(self, hw):
text = hw.text
for attachment in hw.attachments:
fid, fname = attachment.localpath.split('/')
text += '''<br>Attachment {0}: {2}/{1}<br>'''.format(fname, attachment.localpath, cfg['general']['baseurl'])
fid, fname = attachment.localpath.split("/")
text += """<br>Attachment {0}: {2}/{1}<br>""".format(
fname, attachment.localpath, cfg["general"]["baseurl"]
)
if len(text) > 900:
url = self._make_site(text)
shorttext = text[:900]
text = '{}...\n\nfulltext saved at: {}'.format(shorttext, url)
text = text.replace('<br>', '\n')
text = "{}...\n\nfulltext saved at: {}".format(shorttext, url)
text = text.replace("<br>", "\n")
try:
self.logger.info(text)
self.logger.info(hw.subject)
pushover.Client(self.user.notification.info).send_message(
text,
title=f"Homework: {hw.subject}",
html=True,
text, title=f"Homework: {hw.subject}", html=True
)
except pushover.RequestError as e:
self.logger.error('Sending notification failed', exc_info=e)
self.logger.error("Sending notification failed", exc_info=e)
def _notify_hw_mail(self, hw):
# Import the email modules we'll need
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import smtplib
self._send_attachment_mail(
hw.text,
f"INFOMENTOR Homework: {hw.subject}",
hw.attachments,
self.user.notification.info,
)
def _send_attachment_mail(
self, text, subject, attachments, to, fr="infomentor@09a.de"
):
outer = MIMEMultipart()
text = hw.text.replace('<br>', '\n')
outer.attach(MIMEText(text + '\n\n'))
outer['Subject'] = f'INFOMENTOR Homework: {hw.subject}'
outer['From'] = 'infomentor@09a.de'
outer['To'] = self.user.notification.info
for attachment in hw.attachments:
fid, fname = attachment.localpath.split('/')
filename = os.path.join('files', attachment.localpath)
text = text.replace("<br>", "\n")
outer.attach(MIMEText(text + "\n\n"))
outer["Subject"] = subject
outer["From"] = fr
outer["To"] = to
for attachment in attachments:
fid, fname = attachment.localpath.split("/")
filename = os.path.join("files", attachment.localpath)
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
with open(filename, 'rb') as fp:
ctype = "application/octet-stream"
maintype, subtype = ctype.split("/", 1)
with open(filename, "rb") as fp:
msg = MIMEBase(maintype, subtype)
msg.set_payload(fp.read())
encoders.encode_base64(msg)
msg.add_header('Content-Disposition', 'attachment', filename=fname)
msg.add_header("Content-Disposition", "attachment", filename=fname)
outer.attach(msg)
self._send_mail(outer)
def _send_text_mail(self, to, subject, text, fr="infomentor@09a.de"):
mail = MIMEText(text)
mail["Subject"] = subject
mail["From"] = fr
mail["To"] = to
self._send_mail(mail)
def _send_mail(self, mail):
s = smtplib.SMTP_SSL(cfg['smtp']['server'])
s.login(cfg['smtp']['username'], cfg['smtp']['password'])
s = smtplib.SMTP_SSL(cfg["smtp"]["server"])
s.login(cfg["smtp"]["username"], cfg["smtp"]["password"])
s.send_message(mail)
s.quit()
def update_calendar(self):
session = db.get_db()
print(self.user.icalendar)
if self.user.icalendar is None:
return
icx = icloudcalendar.iCloudConnector(self.user.icalendar.icloud_user, self.user.icalendar.password)
icx = icloudcalendar.iCloudConnector(
self.user.icalendar.icloud_user, self.user.icalendar.password
)
cname = self.user.icalendar.calendarname
cal = icx.get_named_calendar(cname)
if not cal:
@@ -234,31 +246,31 @@ class Informer(object):
for calevent in cal.events():
if calevent.data is None:
continue
uid = re.findall('UID:(.*)', calevent.data)[0]
uid = re.findall("UID:(.*)", calevent.data)[0]
known_entries[uid] = calevent
for entry in calentries:
self.logger.debug(entry)
uid = 'infomentor_{}'.format(entry['id'])
event_details = self.im.get_event(entry['id'])
uid = "infomentor_{}".format(entry["id"])
event_details = self.im.get_event(entry["id"])
self.logger.debug(event_details)
calend = Calendar()
event = Event()
event.add('uid', 'infomentor_{}'.format(entry['id']))
event.add('summary', entry['title'])
event.add('description', event_details['notes'])
if not event_details['allDayEvent']:
event.add('dtstart', dateparser.parse(entry['start']))
event.add('dtend', dateparser.parse(entry['end']))
event.add("uid", "infomentor_{}".format(entry["id"]))
event.add("summary", entry["title"])
event.add("description", event_details["notes"])
if not event_details["allDayEvent"]:
event.add("dtstart", dateparser.parse(entry["start"]))
event.add("dtend", dateparser.parse(entry["end"]))
else:
event.add('dtstart', dateparser.parse(entry['start']).date())
event.add('dtend', dateparser.parse(entry['end']).date())
event.add("dtstart", dateparser.parse(entry["start"]).date())
event.add("dtend", dateparser.parse(entry["end"]).date())
calend.add_component(event)
new_cal_entry = calend.to_ical().decode('utf-8').replace('\r','')
new_cal_entry = calend.to_ical().decode("utf-8").replace("\r", "")
if uid in known_entries:
if known_entries[uid].data == new_cal_entry:
self.logger.info('no change for calendar entry {}'.format(uid))
self.logger.info("no change for calendar entry {}".format(uid))
continue
self.logger.debug(calend.to_ical())
cal.add_event(calend.to_ical())