diff --git a/apps/ocr/__init__.py b/apps/ocr/__init__.py index 00efe7b276..1bed491709 100644 --- a/apps/ocr/__init__.py +++ b/apps/ocr/__init__.py @@ -8,8 +8,12 @@ from permissions.api import register_permission, set_namespace_title from documents.models import Document from main.api import register_tool +from scheduler.api import register_interval_job + from ocr.conf.settings import AUTOMATIC_OCR +from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL from ocr.models import DocumentQueue, QueueTransformation +from ocr.tasks import task_process_document_queues #Permissions PERMISSION_OCR_DOCUMENT = {'namespace': 'ocr', 'name': 'ocr_document', 'label': _(u'Submit document for OCR')} @@ -74,3 +78,5 @@ def document_post_save(sender, instance, **kwargs): DocumentQueue.objects.queue_document(instance) post_save.connect(document_post_save, sender=Document) + +register_interval_job(task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL) diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index 8d44522c1e..4da8c4eaf0 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -17,8 +17,8 @@ from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \ from ocr.models import QueueDocument, DocumentQueue from ocr.conf.settings import NODE_CONCURRENT_EXECUTION from ocr.conf.settings import REPLICATION_DELAY -from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL from ocr.conf.settings import CACHE_URI +from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes # TODO: Tie LOCK_EXPIRATION with hard task timeout @@ -46,14 +46,14 @@ else: release_lock = lambda lock_id: True -@task +#@task def task_process_queue_document(queue_document_id): lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id) if acquire_lock(lock_id): queue_document = QueueDocument.objects.get(pk=queue_document_id) queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING queue_document.node_name = platform.node() - queue_document.result = task_process_queue_document.request.id + #queue_document.result = task_process_queue_document.request.id queue_document.save() try: do_document_ocr(queue_document) @@ -66,6 +66,8 @@ def task_process_queue_document(queue_document_id): def reset_orphans(): + pass + ''' i = inspect().active() active_tasks = [] orphans = [] @@ -86,13 +88,12 @@ def reset_orphans(): orphan.delay = False orphan.node_name = None orphan.save() + ''' + - -@periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL)) def task_process_document_queues(): if not cache_backend: random_delay() - # reset_orphans() # Causes problems with big clusters increased latency # Disabled until better solution @@ -110,6 +111,7 @@ def task_process_document_queues(): if oldest_queued_document_qs: oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - task_process_queue_document.delay(oldest_queued_document.pk) + #task_process_queue_document.delay(oldest_queued_document.pk) + task_process_queue_document(oldest_queued_document.pk) except Exception, e: print 'DocumentQueueWatcher exception: %s' % e diff --git a/apps/scheduler/__init__.py b/apps/scheduler/__init__.py new file mode 100644 index 0000000000..a9440e946b --- /dev/null +++ b/apps/scheduler/__init__.py @@ -0,0 +1,4 @@ +from apscheduler.scheduler import Scheduler + +scheduler = Scheduler() +scheduler.start() diff --git a/apps/scheduler/api.py b/apps/scheduler/api.py new file mode 100644 index 0000000000..4c59b41fa5 --- /dev/null +++ b/apps/scheduler/api.py @@ -0,0 +1,12 @@ +from scheduler import scheduler + +registered_jobs = {} + + +def register_interval_job(func, weeks=0, days=0, hours=0, minutes=0, + seconds=0, start_date=None, args=None, + kwargs=None, job_name=None, **options): + + scheduler.add_interval_job(func=func, weeks=weeks, days=days, + hours=hours, minutes=minutes, seconds=seconds, + start_date=start_date, args=args, kwargs=kwargs)#, **options) diff --git a/apps/scheduler/models.py b/apps/scheduler/models.py new file mode 100644 index 0000000000..71a8362390 --- /dev/null +++ b/apps/scheduler/models.py @@ -0,0 +1,3 @@ +from django.db import models + +# Create your models here. diff --git a/apps/scheduler/tests.py b/apps/scheduler/tests.py new file mode 100644 index 0000000000..2247054b35 --- /dev/null +++ b/apps/scheduler/tests.py @@ -0,0 +1,23 @@ +""" +This file demonstrates two different styles of tests (one doctest and one +unittest). These will both pass when you run "manage.py test". + +Replace these with more appropriate tests for your application. +""" + +from django.test import TestCase + +class SimpleTest(TestCase): + def test_basic_addition(self): + """ + Tests that 1 + 1 always equals 2. + """ + self.failUnlessEqual(1 + 1, 2) + +__test__ = {"doctest": """ +Another way to test that 1 + 1 is equal to 2. + +>>> 1 + 1 == 2 +True +"""} + diff --git a/apps/scheduler/views.py b/apps/scheduler/views.py new file mode 100644 index 0000000000..60f00ef0ef --- /dev/null +++ b/apps/scheduler/views.py @@ -0,0 +1 @@ +# Create your views here. diff --git a/requirements/development.txt b/requirements/development.txt index caa9259bc0..356a20d2f9 100644 --- a/requirements/development.txt +++ b/requirements/development.txt @@ -14,3 +14,4 @@ slate==0.3 PIL==1.1.7 ghostscript==0.4.1 pdfminer==20110227 +APScheduler==2.0.2 diff --git a/requirements/production.txt b/requirements/production.txt index df77d598ad..db4dd4d76a 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -11,3 +11,4 @@ slate==0.3 PIL==1.1.7 ghostscript==0.4.1 pdfminer==20110227 +APScheduler==2.0.2 diff --git a/settings.py b/settings.py index d47b7d85ff..11f05f2b14 100644 --- a/settings.py +++ b/settings.py @@ -151,6 +151,7 @@ INSTALLED_APPS = ( 'document_indexing', 'sources', 'mimetype', + 'scheduler', ) TEMPLATE_CONTEXT_PROCESSORS = (