rework completed

This commit is contained in:
2018-10-21 07:27:26 +02:00
parent e63ae46724
commit f5a9421eac
9 changed files with 964 additions and 1 deletions

0
infomentor/__init__.py Normal file
View File

153
infomentor/__main__.py Normal file
View File

@@ -0,0 +1,153 @@
import infomentor.flock as flock
import logging
import argparse
import datetime
import sys
import os
from infomentor import db, model, connector, informer
logformat='{asctime} - {name:25s} - {levelname:8s} - {message}'
def logtofile():
logging.basicConfig(
level=logging.INFO,
format=logformat,
filename='log.txt',
filemode='a+',
style='{'
)
def logtoconsole():
logging.basicConfig(
level=logging.DEBUG,
format=logformat,
style='{'
)
def parse_args(arglist):
parser = argparse.ArgumentParser(description='Infomentor Grabber and Notifier')
parser.add_argument('--nolog', action='store_true', help='print log instead of logging to file')
parser.add_argument('--adduser', type=str, help='add user')
parser.add_argument('--addpushover', type=str, help='add pushover')
parser.add_argument('--addmail', type=str, help='add mail')
args = parser.parse_args(arglist)
return args
def add_user(username):
session = db.get_db()
existing_user = session.query(model.User.name == username).one_or_none()
if existing_user is not None:
print('user exists, change pw')
else:
print(f'Adding user: {username}')
import getpass
password = getpass.getpass(prompt='Password: ')
if existing_user is not None:
existing_user.password = password
else:
user = model.User(name=username, password=password)
session.add(user)
session.commit()
def add_pushover(username):
session = db.get_db()
user = session.query(model.User).filter(model.User.name == username).one_or_none()
if user is None:
print('user does not exist')
return
else:
print(f'Adding PUSHOVER for user: {username}')
id = input('PUSHOVER ID: ')
user.notification = model.Notification(ntype=model.Notification.Types.PUSHOVER, info=id)
session.commit()
def add_mail(username):
session = db.get_db()
user = session.query(model.User).filter(model.User.name == username).one_or_none()
if user is None:
print('user does not exist')
return
else:
print(f'Adding MAIL for user: {username}')
address = input('MAIL ADDRESS: ')
user.notification = model.Notification(ntype=model.Notification.Types.EMAIL, info=address)
session.commit()
def notify_users():
logger = logging.getLogger(__name__)
session = db.get_db()
for user in session.query(model.User):
logger.info('==== USER: %s =====', user.name)
if user.password == '':
logger.warning('User %s not enabled', user.name)
continue
now = datetime.datetime.now()
im = connector.Infomentor(user.name)
im.login(user.password)
logger.info('User loggedin')
statusinfo = {'datetime': now, 'ok': False, 'info': '', 'degraded_count':0}
if user.apistatus is None:
user.apistatus = model.ApiStatus(**statusinfo)
logger.info('Former API status: %s', user.apistatus)
try:
i = informer.Informer(user, im, logger=logger)
i.update_news()
i.update_homework()
statusinfo['ok'] = True
statusinfo['degraded'] = False
except Exception as e:
inforstr = 'Exception occured:\n{}:{}\n'.format(type(e).__name__, e)
statusinfo['ok'] = False
statusinfo['info'] = inforstr
logger.exception("Something went wrong")
finally:
if user.apistatus.ok == True and statusinfo['ok'] == False:
logger.error('Switching to degraded state %s', user.name)
statusinfo['degraded_count'] = 1
if user.apistatus.ok == False and statusinfo['ok'] == False:
if user.apistatus.degraded_count == 1 and user.wantstatus:
send_status_update(user, statusinfo['info'])
try:
statusinfo['degraded_count'] = user.apistatus['degraded_count'] + 1
except Exception as e:
statusinfo['degraded_count'] = 1
if user.apistatus.ok == False and statusinfo['ok'] == True:
statusinfo['info'] = 'Works as expected, failed {} times'.format(user.apistatus.degraded_count)
statusinfo['degraded_count'] = 0
if user.wantstatus:
send_status_update(user, statusinfo['info'])
user.apistatus.updateobj(statusinfo)
logger.info('New API status: %s', user.apistatus)
session.commit()
def send_status_update(user, text):
pass
def main():
args = parse_args(sys.argv[1:])
if args.nolog:
logtoconsole()
else:
logtofile()
logger = logging.getLogger('Infomentor Notifier')
logger.info('STARTING-------------------- %s', os.getpid())
try:
lock = flock.flock()
if not lock.aquire():
logger.info('EXITING - PREVIOUS IS RUNNING')
raise Exception()
if args.adduser:
add_user(args.adduser)
elif args.addpushover:
add_pushover(args.addpushover)
else:
notify_users()
except Exception as e:
logger.info('Exceptional exit')
logger.exception('Info')
finally:
logger.info('EXITING--------------------- %s', os.getpid())
if __name__ == "__main__":
main()

401
infomentor/connector.py Normal file
View File

@@ -0,0 +1,401 @@
import requests
import re
import json
import os
import http.cookiejar
import time
import math
import datetime
import contextlib
import logging
import urllib.parse
import uuid
from infomentor import model
class InfomentorFile(object):
def __init__(self, directory, filename):
if directory is None:
raise Exception('directory is required')
self.filename = filename
self.randomid = str(uuid.uuid4())
self.directory = directory
@property
def targetfile(self):
return os.path.join(self.directory, self.fullfilename)
@property
def targetdir(self):
return os.path.join(self.directory, self.randomid)
@property
def fullfilename(self):
if self.filename is None:
raise Exception('no filename set')
return os.path.join(self.randomid, self.filename)
def save_file(self, content):
os.makedirs(self.targetdir, exist_ok=True)
with open(self.targetfile, 'wb+') as f:
f.write(content)
class Infomentor(object):
'''Basic object for handling infomentor site login and fetching of data'''
BASE_IM1 = 'https://im1.infomentor.de/Germany/Germany/Production'
BASE_MIM = 'https://mein.infomentor.de'
def __init__(self, user, logger=None):
'''Create informentor object for username'''
self.logger = logger or logging.getLogger(__name__)
self._last_result = None
self.user = user
self._create_session()
def _create_session(self):
'''Create the session for handling all further requests'''
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'Mozilla/5.0'})
self._load_cookies()
def _load_cookies(self):
'''Setup the cookie requests'''
os.makedirs('cookiejars', exist_ok=True)
self.session.cookies = http.cookiejar.MozillaCookieJar(
filename='cookiejars/{}.cookies'.format(self.user)
)
with contextlib.suppress(FileNotFoundError):
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
def login(self, password):
'''Login using the given password'''
if self.logged_in(self.user):
return True
self._do_login(self.user, password)
return self.logged_in(self.user)
def logged_in(self, username):
'''Check if user is logged in (with cookies)'''
ts = math.floor(time.time())
auth_check_url = 'authentication/authentication/' + \
'isauthenticated/?_={}000'.format(ts)
url = self._mim_url(auth_check_url)
r = self._do_post(url)
self.logger.info('%s loggedin: %s', username, r.text)
return r.text.lower() == 'true'
def _do_login(self, user, password):
self._do_request_initial_token()
self._perform_login(password)
self._finalize_login()
def _do_request_initial_token(self):
'''Request initial oauth_token'''
# Get the initial oauth token
self._do_get(self._mim_url())
self._oauth_token = self._get_auth_token()
# This request is performed by the browser, the reason is unclear
login_url = self._mim_url(
'Authentication/Authentication/Login?ReturnUrl=%2F')
self._do_get(login_url)
def _get_auth_token(self):
'''Reading oauth_token from response text'''
token_re = r'name="oauth_token" value="([^"]*)"'
tokens = re.findall(token_re, self._last_result.text)
if len(tokens) != 1:
self.logger.error('OAUTH_TOKEN not found')
raise Exception('Invalid Count of tokens')
return tokens[0]
def _perform_login(self, password):
self._do_post(
self._im1_url('mentor/'),
data={'oauth_token': self._oauth_token}
)
# Extract the hidden fields content
payload = self._get_hidden_fields()
# update with the missing and the login parameters
payload.update({
'login_ascx$txtNotandanafn': self.user,
'login_ascx$txtLykilord': password,
'__EVENTTARGET': 'login_ascx$btnLogin',
'__EVENTARGUMENT': ''
})
# perform the login
self._do_post(
self._im1_url('mentor/'),
data=payload,
headers={
'Referer': self._im1_url('mentor/'),
'Content-Type': 'application/x-www-form-urlencoded'
}
)
def _get_hidden_fields(self):
hiddenfields = self._extract_hidden_fields()
field_values = {}
for f in hiddenfields:
names = re.findall('name="([^"]*)"', f)
if len(names) != 1:
self.logger.error('Could not parse hidden field (fieldname)')
continue
values = re.findall('value="([^"]*)"', f)
if len(values) != 1:
self.logger.error('Could not parse hidden field (value)')
continue
field_values[names[0]] = values[0]
return field_values
def _extract_hidden_fields(self):
hidden_re = '<input type="hidden"(.*?) />'
hiddenfields = re.findall(hidden_re, self._last_result.text)
return hiddenfields
def _finalize_login(self):
# Read the oauth token which is the final token for the login
oauth_token = self._get_auth_token()
# authenticate
self._do_post(
self._im1_url('mentor/'),
data={'oauth_token': oauth_token}
)
self._do_get(self._mim_url())
def _do_post(self, url, **kwargs):
'''Post request for session'''
self.logger.info('post to: %s', url)
if 'data' in kwargs:
self.logger.info('data: %s', json.dumps(kwargs['data'], indent=2))
self._last_result = self.session.post(url, **kwargs)
self.logger.info('result: %d', self._last_result.status_code)
self._save_cookies()
return self._last_result
def _do_get(self, url, **kwargs):
'''get request for session'''
self.logger.info('get: %s', url)
self._last_result = self.session.get(url, **kwargs)
self.logger.info('result: %d', self._last_result.status_code)
self._save_cookies()
if self._last_result.status_code != 200:
raise Exception('Got response with code {}'.format(
self._last_result.status_code
))
return self._last_result
def _save_cookies(self):
'''Save cookies'''
self.session.cookies.save(ignore_discard=True, ignore_expires=True)
def download_file(self, url, filename=None, directory=None):
'''download a file with given name or provided filename'''
self.logger.info('fetching download: %s', url)
if filename is not None or directory is not None:
return self._download_file(url, directory, filename)
else:
self.logger.error('fetching download requires filename or folder')
raise Exception('Download Failed')
def _get_filename_from_cd(self):
'''determine filename from headers or random uuid'''
cd = self._last_result.headers.get('content-disposition')
if cd:
filename_re = r'''
.* # Anything
(?:
filename=(?P<native>.+) # normal filename
|
filename\*=(?P<extended>.+) # extended filename
) # The filename
(?:$|;.*) # End or more
'''
fname = re.match(filename_re, cd, flags=re.VERBOSE)
filename = fname.group('native')
if filename is not None and len(filename) != 0:
return filename
filename = fname.group('extended')
if filename is not None and len(filename) != 0:
encoding, string = filename.split("''")
return urllib.parse.unquote(string, encoding)
filename = str(uuid.uuid4())
self.logger.warning(
'no filename detected in %s: using random filename %s',
cd, filename)
return filename
def _download_file(self, url, directory, filename=None):
'''download a file with provided filename'''
file = InfomentorFile(directory, filename)
self.logger.info('to (randomized) directory %s', file.targetdir)
url = self._mim_url(url)
self._do_get(url)
if filename is None:
self.logger.info('determine filename from headers')
filename = self._get_filename_from_cd()
self.logger.info('determined filename: %s', filename)
file.filename = filename
self.logger.info('full filename: %s', file.fullfilename)
file.save_file(self._last_result.content)
return file.fullfilename
def _build_url(self, path='', base=BASE_IM1):
return '{}/{}'.format(base, path)
def _mim_url(self, path=''):
return self._build_url(path, base=self.BASE_MIM)
def _im1_url(self, path=''):
return self._build_url(path, base=self.BASE_IM1)
def get_news_list(self):
self.logger.info('fetching news')
self._do_post(self._mim_url('News/news/GetArticleList'))
news_json = self.get_json_return()
return [str(i['id']) for i in news_json['items']]
def parse_news(self, news_json):
idlist = [str(i['id']) for i in im_news['items']]
self.logger.info('Parsing %d news (%s)', im_news['totalItems'], ', '.join(idlist))
for news_item in reversed(im_news['items']):
newsdata = self.im.get_article(news_item['id'])
def get_news_article(self, id):
article_json = self.get_article(id)
storenewsdata = {
k: article_json[k] for k in ('title', 'content', 'date')
}
storenewsdata['news_id'] = article_json['id']
storenewsdata['raw'] = json.dumps(article_json)
storenewsdata['attachments'] = []
for attachment in article_json['attachments']:
self.logger.info('found attachment %s', attachment['title'])
att_id = re.findall('Download/([0-9]+)?', attachment['url'])[0]
f = self.download_file(attachment['url'], directory='files')
try:
storenewsdata['attachments'].append(model.Attachment(attachment_id=att_id, url=attachment['url'], localpath=f, title=attachment['title']))
except Exception as e:
self.logger.exception('failed to store attachment')
news = model.News(**storenewsdata)
with contextlib.suppress(Exception):
news.imagefile = self.get_newsimage(id)
return news
def get_article(self, id):
self.logger.info('fetching article: %s', id)
self._do_post(
self._mim_url('News/news/GetArticle'),
data={'id': id}
)
return self.get_json_return()
def get_newsimage(self, id):
self.logger.info('fetching article image: %s', id)
filename = '{}.image'.format(id)
url = self._mim_url('News/NewsImage/GetImage?id={}'.format(id))
return self.download_file(url, directory='images', filename=filename)
def get_calendar(self, offset=0, weeks=1):
self.logger.info('fetching calendar')
data = self._get_week_dates(offset=offset, weeks=weeks)
self._do_post(
self._mim_url('Calendar/Calendar/getEntries'),
data=data
)
return self.get_json_return()
def get_homework(self, offset=0):
self.logger.info('fetching homework')
startofweek = self._get_start_of_week(offset)
timestamp = startofweek.strftime('%Y-%m-%dT00:00:00.000Z')
data = {
'date': timestamp,
'isWeek': True,
}
self._do_post(
self._mim_url('Homework/homework/GetHomework'),
data=data
)
return self.get_json_return()
def get_homework_list(self):
self._homework = {}
homeworklist = []
homework = []
homework.extend(self.get_homework())
homework.extend(self.get_homework(1))
for dategroup in homework:
for hw in dategroup['items']:
if hw['id'] == 0:
continue
else:
self._homework[hw['id']] = hw
homeworklist.append(hw['id'])
return homeworklist
def get_homework_info(self, id):
hw = self._homework[id]
storehw = {
k: hw[k] for k in ('subject', 'courseElement')
}
storehw['homework_id'] = hw['id']
storehw['text'] = hw['homeworkText']
storehw['attachments'] = []
for attachment in hw['attachments']:
self.logger.info('found attachment %s', attachment['title'])
att_id = re.findall('Download/([0-9]+)?', attachment['url'])[0]
f = self.download_file(attachment['url'], directory='files')
try:
storehw['attachments'].append(model.Attachment(attachment_id=att_id, url=attachment['url'], localpath=f, title=attachment['title']))
except Exception as e:
self.logger.exception('failed to store attachment')
hw = model.Homework(**storehw)
return hw
def get_timetable(self, offset=0):
self.logger.info('fetching timetable')
data = self._get_week_dates(offset)
self._do_post(
self._mim_url('timetable/timetable/gettimetablelist'),
data=data
)
return self.get_json_return()
def get_json_return(self):
try:
return self._last_result.json()
except json.JSONDecodeError as jse:
self.logger.exception('JSON coudl not be decoded')
self.logger.info('status code: %d', self._last_result.status_code)
self.logger.info('response was: %s', self._last_result.text)
raise
def _get_week_dates(self, offset=0, weeks=1):
weekoffset = datetime.timedelta(days=7*offset)
startofweek = self._get_start_of_weekdays()
endofweek = startofweek + datetime.timedelta(days=5+7*(weeks-1))
startofweek += weekoffset
endofweek += weekoffset
now = datetime.datetime.now()
utctime = datetime.datetime.utcnow()
utcoffset = (now.tm_hour - utctime.tm_hour)*60
data = {
'UTCOffset': utcoffset,
'start': startofweek.strftime('%Y-%m-%d'),
'end': endofweek.strftime('%Y-%m-%d'),
}
return data
def _get_start_of_week(self, offset=0):
now = datetime.datetime.now()
dayofweek = now.weekday()
startofweek = now - datetime.timedelta(days=dayofweek)
startofweek -= datetime.timedelta(days=offset*7)
return startofweek

16
infomentor/db.py Executable file
View File

@@ -0,0 +1,16 @@
from infomentor import model
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
_session = None
def get_db(filename='infomentor.db'):
global _session
if _session is None:
engine = create_engine(f'sqlite:///{filename}')
model.ModelBase.metadata.create_all(engine)
model.ModelBase.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
_session = DBSession()
return _session

48
infomentor/flock.py Normal file
View File

@@ -0,0 +1,48 @@
import os
class flock(object):
filename = '.im.lock'
def __init__(self):
self.pid = os.getpid()
def aquire(self):
if self.is_locked():
return False
with open(self.filename, 'w+') as f:
f.write('{}'.format(self.pid))
return True
def release(self):
if self.own_lock():
os.unlink(self.filename)
def __del__(self):
self.release()
def own_lock(self):
lockinfo = self._get_lockinfo()
return lockinfo == self.pid
def is_locked(self):
lockinfo = self._get_lockinfo()
if not lockinfo:
return False
return self._is_process_active(lockinfo)
def _is_process_active(self, pid):
try:
os.kill(pid, 0)
return pid != self.pid
except Exception as e:
return False
def _get_lockinfo(self):
try:
lock = {}
with open(self.filename, 'r') as f:
pid = int(f.read().strip())
return pid
except Exception as e:
return False

188
infomentor/informer.py Executable file
View File

@@ -0,0 +1,188 @@
from infomentor import model, db
import logging
import uuid
import os
import re
import dateparser
import datetime
import math
import pushover
pushover.init('***REMOVED***')
class Informer(object):
def __init__(self, user, im, logger):
self.logger = logger or logging.getLogger(__name__)
self.user = user
self.im = im
def update_news(self):
session = db.get_db()
newslist = self.im.get_news_list()
for newsid in newslist:
news = session.query(model.News).filter(model.News.news_id == newsid).with_parent(self.user, 'news').one_or_none()
if news is None:
news = self.im.get_news_article(newsid)
self._notify_news(news)
self.user.news.append(news)
session.commit()
def _notify_news(self, news):
if self.user.notification.ntype == model.Notification.Types.PUSHOVER:
self._notify_news_pushover(news)
elif self.user.notification.ntype == model.Notification.Types.EMAIL:
self._notify_news_mail(news)
else:
raise Exception('invalid notification')
pass
def _notify_news_pushover(self, news):
text = news.content
for attachment in news.attachments:
fid, fname = attachment.localpath.split('/')
text += '''<br>Attachment {0}: https://files.hyttioaoa.de/{1}<br>'''.format(fname, attachment.localpath)
parsed_date = dateparser.parse(news.date)
now = datetime.datetime.now()
parsed_date += datetime.timedelta(hours=now.hour, minutes=now.minute)
timestamp = math.floor(parsed_date.timestamp())
if len(text) > 900:
url = self._make_site(text)
shorttext = text[:900]
text = '{}...\n\nfulltext saved at: {}'.format(shorttext, url)
text = text.replace('<br>', '\n')
try:
self.logger.info(text)
self.logger.info(news.title)
if news.imagefile is not None:
image = open(os.path.join('images', news.imagefile), 'rb')
else:
image = None
pushover.Client(self.user.notification.info).send_message(
text,
title=news.title,
attachment=image,
html=True,
timestamp=timestamp
)
except pushover.RequestError as e:
self.logger.error('Sending notification failed', exc_info=e)
finally:
if image is not None:
image.close()
def _make_site(self, text):
filename = str(uuid.uuid4())
fpath = os.path.join('files', filename+'.html')
urlfinder = re.compile("(https?://[^ \n\t]*)")
text = urlfinder.sub(r'<a href="\1">\1</a>', text)
text = '<html> <head> <meta charset="utf-8" /> </head> <body>{}</body></html>'.format(text)
with open(fpath, 'w+') as f:
f.write(text)
return 'https://files.hyttioaoa.de/{}.html'.format(filename)
def _notify_news_mail(self, news):
# Import the email modules we'll need
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import smtplib
outer = MIMEMultipart()
text = news.content.replace('<br>', '\n')
outer.attach(MIMEText(text + '\n\n'))
outer['Subject'] = f'INFOMENTOR News: {news.title}'
outer['From'] = 'infomentor@09a.de'
outer['To'] = self.user.notification.info
for attachment in news.attachments:
fid, fname = attachment.localpath.split('/')
filename = os.path.join('files', attachment.localpath)
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
with open(filename, 'rb') as fp:
msg = MIMEBase(maintype, subtype)
msg.set_payload(fp.read())
encoders.encode_base64(msg)
msg.add_header('Content-Disposition', 'attachment', filename=fname)
outer.attach(msg)
s = smtplib.SMTP_SSL('09a.de')
s.login('infomentor@09a.de', '***REMOVED***')
s.send_message(outer)
s.quit()
def update_homework(self):
session = db.get_db()
homeworklist = self.im.get_homework_list()
for homeworkid in homeworklist:
homework = session.query(model.Homework).filter(model.Homework.homework_id == homeworkid).with_parent(self.user, 'homeworks').one_or_none()
if homework is None:
homework = self.im.get_homework_info(homeworkid)
self._notify_hw(homework)
self.user.homeworks.append(homework)
session.commit()
def _notify_hw(self, hw):
if self.user.notification.ntype == model.Notification.Types.PUSHOVER:
self._notify_hw_pushover(hw)
elif self.user.notification.ntype == model.Notification.Types.EMAIL:
self._notify_hw_mail(hw)
else:
raise Exception('invalid notification')
pass
def _notify_hw_pushover(self, hw):
text = hw.text
for attachment in hw.attachments:
fid, fname = attachment.localpath.split('/')
text += '''<br>Attachment {0}: https://files.hyttioaoa.de/{1}<br>'''.format(fname, attachment.localpath)
parsed_date = dateparser.parse(hw.date)
if len(text) > 900:
url = self._make_site(text)
shorttext = text[:900]
text = '{}...\n\nfulltext saved at: {}'.format(shorttext, url)
text = text.replace('<br>', '\n')
try:
self.logger.info(text)
self.logger.info(hw.subject)
pushover.Client(self.user.notification.info).send_message(
text,
title=hw.title,
html=True,
)
except pushover.RequestError as e:
self.logger.error('Sending notification failed', exc_info=e)
def _notify_hw_mail(self, hw):
# Import the email modules we'll need
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import mimetypes
import smtplib
outer = MIMEMultipart()
text = hw.text.replace('<br>', '\n')
outer.attach(MIMEText(text + '\n\n'))
outer['Subject'] = f'INFOMENTOR Homework: {hw.subject}'
outer['From'] = 'infomentor@09a.de'
outer['To'] = self.user.notification.info
for attachment in hw.attachments:
fid, fname = attachment.localpath.split('/')
filename = os.path.join('files', attachment.localpath)
ctype, encoding = mimetypes.guess_type(filename)
if ctype is None or encoding is not None:
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
with open(filename, 'rb') as fp:
msg = MIMEBase(maintype, subtype)
msg.set_payload(fp.read())
encoders.encode_base64(msg)
msg.add_header('Content-Disposition', 'attachment', filename=fname)
outer.attach(msg)
s = smtplib.SMTP_SSL('09a.de')
s.login('infomentor@09a.de', '***REMOVED***')
s.send_message(outer)
s.quit()

144
infomentor/model.py Executable file
View File

@@ -0,0 +1,144 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Enum, DateTime, Boolean
from sqlalchemy.orm import relationship
from Crypto.Cipher import AES
import base64
import enum
import hashlib
ModelBase = declarative_base()
_PASSWORD_SECRET_KEY = '***REMOVED***'
BS = 16
def pad(s):
diff = BS - len(s) % BS
return (s + (diff) * chr(diff)).encode('utf8')
def unpad(s):
return s[0:-s[-1]].decode('utf8')
class User(ModelBase):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
enc_password = Column(String)
notification = relationship("Notification", back_populates="user", uselist=False)
apistatus = relationship("ApiStatus", back_populates="user", uselist=False)
wantstatus = Column(Boolean)
homeworks = relationship("Homework", back_populates="user")
news = relationship("News",back_populates="user")
def __init__(self, *args, **kwargs):
self._setup_cipher()
super().__init__(*args, **kwargs)
def _setup_cipher(self):
if not hasattr(self, 'cipher'):
aeskey = hashlib.sha256(_PASSWORD_SECRET_KEY.encode()).digest()
self.cipher = AES.new(aeskey,AES.MODE_ECB)
@property
def password(self):
self._setup_cipher()
decoded = self.cipher.decrypt(base64.b64decode(self.enc_password))
return unpad(decoded)
@password.setter
def password(self, value):
self._setup_cipher()
encoded = base64.b64encode(self.cipher.encrypt(pad(value)))
self.enc_password = encoded
def __repr__(self):
return "<User(name='%s', password='%s')>" % (
self.name, '*' * len(self.password))
class Notification(ModelBase):
__tablename__ = 'notifications'
class Types(enum.Enum):
PUSHOVER = 1
EMAIL = 2
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
ntype = Column(Enum(Types))
info = Column(String)
user = relationship("User", back_populates="notification")
def __repr__(self):
return "<Notification(type='{}', info='{}')>".format(
self.ntype, self.info)
class Attachment(ModelBase):
__tablename__ = 'attachments'
id = Column(Integer, primary_key=True)
attachment_id = Column(Integer)
filetype = Column(String)
url = Column(String)
title = Column(String)
localpath = Column(String)
news_id = Column(Integer, ForeignKey('news.id'))
homework_id = Column(Integer, ForeignKey('homework.id'))
news = relationship("News", back_populates="attachments")
homework = relationship("Homework", back_populates="attachments")
class News(ModelBase):
__tablename__ = 'news'
id = Column(Integer, primary_key=True)
news_id = Column(Integer)
user_id = Column(Integer, ForeignKey('users.id'))
title = Column(String)
content = Column(String)
category = Column(String)
date = Column(String)
imageUrl = Column(String)
imagefile = Column(String)
notified = Column(Boolean, default=False)
raw = Column(String)
attachments = relationship("Attachment", order_by=Attachment.id, back_populates="news", uselist=True)
user = relationship("User", back_populates="news")
def __repr__(self):
return "<News(id='%d', title='%s')>" % (
self.id, self.title)
class Homework(ModelBase):
__tablename__ = 'homework'
id = Column(Integer, primary_key=True)
homework_id = Column(Integer)
user_id = Column(Integer, ForeignKey('users.id'))
subject = Column(String)
courseElement = Column(String)
text = Column(String)
date = Column(String)
imageUrl = Column(String)
attachments = relationship("Attachment", order_by=Attachment.id, back_populates="homework")
user = relationship("User", back_populates="homeworks")
class ApiStatus(ModelBase):
__tablename__ = 'api_status'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
degraded_count = Column(Integer)
datetime = Column(DateTime)
info = Column(String)
ok = Column(Boolean)
user = relationship("User", back_populates="apistatus", uselist=False)
def updateobj(self, data):
for key, value in data.items():
setattr(self, key, value)
def __repr__(self):
return "<ApiStatus(ok='%s', NOKs='%d', info='%s')>" % (
self.ok, self.degraded_count, self.info)

View File

@@ -1,4 +1,5 @@
requests
dataset
sqlalchemy
dateparser
python-pushover
pycrypto

12
setup.py Normal file
View File

@@ -0,0 +1,12 @@
from setuptools import setup, find_packages
setup(
name = 'infomentor',
version = '1.0.0',
url = 'https://github.com/mypackage.git',
author = 'Matthias Bilger',
author_email = 'matthias@bilger.info',
description = 'grab infomentor news and push them',
packages = find_packages(),
install_requires = ['pycrypt', 'requests'],
)