Initial pop3 email fetch task
This commit is contained in:
@@ -8,6 +8,7 @@ from common.utils import encapsulate
|
||||
from project_setup.api import register_setup
|
||||
from documents.permissions import (PERMISSION_DOCUMENT_NEW_VERSION,
|
||||
PERMISSION_DOCUMENT_CREATE)
|
||||
from scheduler.api import register_interval_job
|
||||
|
||||
from .staging import StagingFile
|
||||
from .models import (WebForm, StagingFolder, SourceTransformation,
|
||||
@@ -16,6 +17,7 @@ from .widgets import staging_file_thumbnail
|
||||
from .permissions import (PERMISSION_SOURCES_SETUP_VIEW,
|
||||
PERMISSION_SOURCES_SETUP_EDIT, PERMISSION_SOURCES_SETUP_DELETE,
|
||||
PERMISSION_SOURCES_SETUP_CREATE)
|
||||
from .tasks import task_fetch_pop3_emails
|
||||
|
||||
staging_file_preview = {'text': _(u'preview'), 'class': 'fancybox-noscaling', 'view': 'staging_file_preview', 'args': ['source.source_type', 'source.pk', 'object.id'], 'famfam': 'zoom', 'permissions': [PERMISSION_DOCUMENT_NEW_VERSION, PERMISSION_DOCUMENT_CREATE]}
|
||||
staging_file_delete = {'text': _(u'delete'), 'view': 'staging_file_delete', 'args': ['source.source_type', 'source.pk', 'object.id'], 'famfam': 'delete', 'keep_query': True, 'permissions': [PERMISSION_DOCUMENT_NEW_VERSION, PERMISSION_DOCUMENT_CREATE]}
|
||||
@@ -76,3 +78,5 @@ register_model_list_columns(StagingFile, [
|
||||
])
|
||||
|
||||
register_setup(setup_sources)
|
||||
|
||||
#register_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=5)#QUEUE_PROCESSING_INTERVAL)
|
||||
|
||||
102
apps/sources/tasks.py
Normal file
102
apps/sources/tasks.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from datetime import timedelta, datetime
|
||||
import platform
|
||||
import logging
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
from job_processor.api import process_job
|
||||
from lock_manager import Lock, LockError
|
||||
|
||||
#from .api import do_document_ocr
|
||||
#from .literals import (QUEUEDOCUMENT_STATE_PENDING,
|
||||
# QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE,
|
||||
# QUEUEDOCUMENT_STATE_ERROR)
|
||||
from .models import POP3Email
|
||||
#from .conf.settings import (NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY,
|
||||
# QUEUE_PROCESSING_INTERVAL)
|
||||
|
||||
LOCK_EXPIRE = 30 * 1 # Lock expires in 10 minutes
|
||||
# TODO: Tie LOCK_EXPIRATION with hard task timeout
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
"""
|
||||
def task_process_queue_document(queue_document_id):
|
||||
lock_id = u'task_proc_queue_doc-%d' % queue_document_id
|
||||
try:
|
||||
logger.debug('trying to acquire lock: %s' % lock_id)
|
||||
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
logger.debug('acquired lock: %s' % lock_id)
|
||||
queue_document = QueueDocument.objects.get(pk=queue_document_id)
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
|
||||
queue_document.node_name = platform.node()
|
||||
queue_document.save()
|
||||
try:
|
||||
do_document_ocr(queue_document)
|
||||
queue_document.delete()
|
||||
except Exception, e:
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
|
||||
queue_document.result = e
|
||||
queue_document.save()
|
||||
|
||||
lock.release()
|
||||
except LockError:
|
||||
logger.debug('unable to obtain lock')
|
||||
pass
|
||||
"""
|
||||
|
||||
def task_fetch_single_pop3_email(pop3_email):
|
||||
try:
|
||||
lock_id = u'task_fetch_pop3_email-%d' % pop3_email.pk
|
||||
logger.debug('trying to acquire lock: %s' % lock_id)
|
||||
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
logger.debug('acquired lock: %s' % lock_id)
|
||||
#try:
|
||||
pop3_email.fetch_mail()
|
||||
#except Exception, exc:
|
||||
#raise
|
||||
#finally:
|
||||
lock.release()
|
||||
except LockError:
|
||||
logger.error('unable to obtain lock')
|
||||
pass
|
||||
|
||||
|
||||
def task_fetch_pop3_emails():
|
||||
logger.debug('executing')
|
||||
for pop3_email in POP3Email.objects.filter(enabled=True):
|
||||
try:
|
||||
task_fetch_single_pop3_email(pop3_email)
|
||||
except Exception, exc:
|
||||
logger.error('Unhandled exception: %s' % exc)
|
||||
|
||||
"""
|
||||
# TODO: reset_orphans()
|
||||
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
|
||||
q_delayed = Q(delay=True)
|
||||
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
current_local_processing_count = QueueDocument.objects.filter(
|
||||
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
|
||||
node_name=platform.node()).count()
|
||||
if current_local_processing_count < NODE_CONCURRENT_EXECUTION:
|
||||
try:
|
||||
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
|
||||
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
|
||||
|
||||
if oldest_queued_document_qs:
|
||||
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
|
||||
process_job(task_process_queue_document, oldest_queued_document.pk)
|
||||
except Exception, e:
|
||||
pass
|
||||
#print 'DocumentQueueWatcher exception: %s' % e
|
||||
finally:
|
||||
# Don't process anymore from this queryset, might be stale
|
||||
break
|
||||
else:
|
||||
logger.debug('already processing maximun')
|
||||
else:
|
||||
logger.debug('nothing to process')
|
||||
"""
|
||||
Reference in New Issue
Block a user