Improve periodic task creation and interval reuse
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from ast import literal_eval
|
||||
from email.Utils import collapse_rfc2231_value
|
||||
from email import message_from_string
|
||||
import json
|
||||
import imaplib
|
||||
import logging
|
||||
import os
|
||||
import poplib
|
||||
|
||||
from django.contrib.contenttypes import generic
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
@@ -26,8 +30,8 @@ from .literals import (DEFAULT_INTERVAL, DEFAULT_POP3_TIMEOUT,
|
||||
SOURCE_CHOICES_PLURAL, SOURCE_CHOICE_STAGING,
|
||||
SOURCE_CHOICE_WATCH, SOURCE_CHOICE_WEB_FORM,
|
||||
SOURCE_INTERACTIVE_UNCOMPRESS_CHOICES,
|
||||
SOURCE_UNCOMPRESS_CHOICES, SOURCE_CHOICE_EMAIL_IMAP,
|
||||
SOURCE_CHOICE_EMAIL_POP3)
|
||||
SOURCE_UNCOMPRESS_CHOICES, SOURCE_UNCOMPRESS_CHOICE_Y,
|
||||
SOURCE_CHOICE_EMAIL_IMAP, SOURCE_CHOICE_EMAIL_POP3)
|
||||
from .managers import SourceTransformationManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -36,6 +40,7 @@ logger = logging.getLogger(__name__)
|
||||
class Source(models.Model):
|
||||
title = models.CharField(max_length=64, verbose_name=_(u'Title'))
|
||||
enabled = models.BooleanField(default=True, verbose_name=_(u'Enabled'))
|
||||
# TODO: remove whitelist and blacklists
|
||||
whitelist = models.TextField(blank=True, verbose_name=_(u'Whitelist'), editable=False)
|
||||
blacklist = models.TextField(blank=True, verbose_name=_(u'Blacklist'), editable=False)
|
||||
|
||||
@@ -199,32 +204,41 @@ class IntervalBaseModel(OutOfProcessSource):
|
||||
document_type = models.ForeignKey(DocumentType, null=True, blank=True, verbose_name=_('Document type'), help_text=_('Assign a document type to documents uploaded from this source.'))
|
||||
uncompress = models.CharField(max_length=1, choices=SOURCE_UNCOMPRESS_CHOICES, verbose_name=_('Uncompress'), help_text=_('Whether to expand or not, compressed archives.'))
|
||||
|
||||
def _get_periodic_task_name(self, pk=None):
|
||||
return 'check_interval_source-%i' % (pk or self.pk)
|
||||
|
||||
def _delete_periodic_task(self, pk=None):
|
||||
periodic_task = PeriodicTask.objects.get(name=self._get_periodic_task_name(pk))
|
||||
|
||||
interval_instance = periodic_task.interval
|
||||
|
||||
if tuple(interval_instance.periodictask_set.values_list('id', flat=True)) == (periodic_task.pk,):
|
||||
# Only delete the interval if nobody else is using it
|
||||
interval_instance.delete()
|
||||
else:
|
||||
periodic_task.delete()
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
new_source = not self.pk
|
||||
super(IntervalBaseModel, self).save(*args, **kwargs)
|
||||
periodic_task_name = 'check_interval_source-%i' % self.pk
|
||||
if new_source:
|
||||
interval_instance = IntervalSchedule.objects.create(every=self.interval, period='seconds')
|
||||
PeriodicTask.objects.create(
|
||||
name=periodic_task_name,
|
||||
interval=interval_instance,
|
||||
task='sources.tasks.task_check_interval_source',
|
||||
queue='mailing',
|
||||
kwargs=json.dumps({'source_id': self.pk})
|
||||
)
|
||||
else:
|
||||
periodic_task = PeriodicTask.objects.get(name=periodic_task_name)
|
||||
periodic_task.interval.every = self.interval
|
||||
periodic_task.interval.save()
|
||||
periodic_task.save()
|
||||
|
||||
if not new_source:
|
||||
self._delete_periodic_task()
|
||||
|
||||
interval_instance, created = IntervalSchedule.objects.get_or_create(every=self.interval, period='seconds')
|
||||
# Create a new interval or reuse someone else's
|
||||
PeriodicTask.objects.create(
|
||||
name=self._get_periodic_task_name(),
|
||||
interval=interval_instance,
|
||||
task='sources.tasks.task_check_interval_source',
|
||||
queue='mailing',
|
||||
kwargs=json.dumps({'source_id': self.pk})
|
||||
)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
periodic_task_name = 'check_interval_source-%i' % self.pk
|
||||
pk = self.pk
|
||||
super(IntervalBaseModel, self).delete(*args, **kwargs)
|
||||
periodic_task = PeriodicTask.objects.get(name=periodic_task_name)
|
||||
interval_instance = periodic_task.interval
|
||||
periodic_task.delete()
|
||||
interval_instance.delete()
|
||||
self._delete_periodic_task(pk)
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Interval source')
|
||||
@@ -239,6 +253,7 @@ class EmailBaseModel(IntervalBaseModel):
|
||||
password = models.CharField(max_length=96, verbose_name=_('Password'))
|
||||
|
||||
# From: http://bookmarks.honewatson.com/2009/08/11/python-gmail-imaplib-search-subject-get-attachments/
|
||||
# TODO: Add lock to avoid being running more than once concurrent
|
||||
@staticmethod
|
||||
def process_message(source, message):
|
||||
email = message_from_string(message)
|
||||
@@ -347,7 +362,7 @@ class IMAPEmail(EmailBaseModel):
|
||||
mailbox.close()
|
||||
mailbox.logout()
|
||||
except Exception as exception:
|
||||
logger.error('Unhandled exception: %s' % exc)
|
||||
logger.error('Unhandled exception: %s' % exception)
|
||||
# TODO: Add user notification
|
||||
|
||||
class Meta:
|
||||
|
||||
Reference in New Issue
Block a user