diff --git a/apps/sources/__init__.py b/apps/sources/__init__.py index d03a2a1f6f..3cd5b85b62 100644 --- a/apps/sources/__init__.py +++ b/apps/sources/__init__.py @@ -8,6 +8,7 @@ from common.utils import encapsulate from project_setup.api import register_setup from documents.permissions import (PERMISSION_DOCUMENT_NEW_VERSION, PERMISSION_DOCUMENT_CREATE) +from scheduler.api import register_interval_job from .staging import StagingFile from .models import (WebForm, StagingFolder, SourceTransformation, @@ -16,6 +17,7 @@ from .widgets import staging_file_thumbnail from .permissions import (PERMISSION_SOURCES_SETUP_VIEW, PERMISSION_SOURCES_SETUP_EDIT, PERMISSION_SOURCES_SETUP_DELETE, PERMISSION_SOURCES_SETUP_CREATE) +from .tasks import task_fetch_pop3_emails staging_file_preview = {'text': _(u'preview'), 'class': 'fancybox-noscaling', 'view': 'staging_file_preview', 'args': ['source.source_type', 'source.pk', 'object.id'], 'famfam': 'zoom', 'permissions': [PERMISSION_DOCUMENT_NEW_VERSION, PERMISSION_DOCUMENT_CREATE]} staging_file_delete = {'text': _(u'delete'), 'view': 'staging_file_delete', 'args': ['source.source_type', 'source.pk', 'object.id'], 'famfam': 'delete', 'keep_query': True, 'permissions': [PERMISSION_DOCUMENT_NEW_VERSION, PERMISSION_DOCUMENT_CREATE]} @@ -76,3 +78,5 @@ register_model_list_columns(StagingFile, [ ]) register_setup(setup_sources) + +#register_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=5)#QUEUE_PROCESSING_INTERVAL) diff --git a/apps/sources/tasks.py b/apps/sources/tasks.py new file mode 100644 index 0000000000..be2a11ac3c --- /dev/null +++ b/apps/sources/tasks.py @@ -0,0 +1,102 @@ +from __future__ import absolute_import + +from datetime import timedelta, datetime +import platform +import logging + +from django.db.models import Q + +from job_processor.api import process_job +from lock_manager import Lock, LockError + +#from .api import do_document_ocr +#from .literals import (QUEUEDOCUMENT_STATE_PENDING, +# QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE, +# QUEUEDOCUMENT_STATE_ERROR) +from .models import POP3Email +#from .conf.settings import (NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY, +# QUEUE_PROCESSING_INTERVAL) + +LOCK_EXPIRE = 30 * 1 # Lock expires in 10 minutes +# TODO: Tie LOCK_EXPIRATION with hard task timeout + +logger = logging.getLogger(__name__) + +""" +def task_process_queue_document(queue_document_id): + lock_id = u'task_proc_queue_doc-%d' % queue_document_id + try: + logger.debug('trying to acquire lock: %s' % lock_id) + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + logger.debug('acquired lock: %s' % lock_id) + queue_document = QueueDocument.objects.get(pk=queue_document_id) + queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING + queue_document.node_name = platform.node() + queue_document.save() + try: + do_document_ocr(queue_document) + queue_document.delete() + except Exception, e: + queue_document.state = QUEUEDOCUMENT_STATE_ERROR + queue_document.result = e + queue_document.save() + + lock.release() + except LockError: + logger.debug('unable to obtain lock') + pass +""" + +def task_fetch_single_pop3_email(pop3_email): + try: + lock_id = u'task_fetch_pop3_email-%d' % pop3_email.pk + logger.debug('trying to acquire lock: %s' % lock_id) + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + logger.debug('acquired lock: %s' % lock_id) + #try: + pop3_email.fetch_mail() + #except Exception, exc: + #raise + #finally: + lock.release() + except LockError: + logger.error('unable to obtain lock') + pass + + +def task_fetch_pop3_emails(): + logger.debug('executing') + for pop3_email in POP3Email.objects.filter(enabled=True): + try: + task_fetch_single_pop3_email(pop3_email) + except Exception, exc: + logger.error('Unhandled exception: %s' % exc) + + """ + # TODO: reset_orphans() + q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) + q_delayed = Q(delay=True) + q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) + for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): + current_local_processing_count = QueueDocument.objects.filter( + state=QUEUEDOCUMENT_STATE_PROCESSING).filter( + node_name=platform.node()).count() + if current_local_processing_count < NODE_CONCURRENT_EXECUTION: + try: + oldest_queued_document_qs = document_queue.queuedocument_set.filter( + (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) + + if oldest_queued_document_qs: + oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] + process_job(task_process_queue_document, oldest_queued_document.pk) + except Exception, e: + pass + #print 'DocumentQueueWatcher exception: %s' % e + finally: + # Don't process anymore from this queryset, might be stale + break + else: + logger.debug('already processing maximun') + else: + logger.debug('nothing to process') + """