Merge branch 'scheduling'
This commit is contained in:
@@ -8,8 +8,12 @@ from permissions.api import register_permission, set_namespace_title
|
||||
from documents.models import Document
|
||||
from main.api import register_tool
|
||||
|
||||
from scheduler.api import register_interval_job
|
||||
|
||||
from ocr.conf.settings import AUTOMATIC_OCR
|
||||
from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
|
||||
from ocr.models import DocumentQueue, QueueTransformation
|
||||
from ocr.tasks import task_process_document_queues
|
||||
|
||||
#Permissions
|
||||
PERMISSION_OCR_DOCUMENT = {'namespace': 'ocr', 'name': 'ocr_document', 'label': _(u'Submit document for OCR')}
|
||||
@@ -74,3 +78,5 @@ def document_post_save(sender, instance, **kwargs):
|
||||
DocumentQueue.objects.queue_document(instance)
|
||||
|
||||
post_save.connect(document_post_save, sender=Document)
|
||||
|
||||
register_interval_job(task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL)
|
||||
|
||||
@@ -17,8 +17,8 @@ from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \
|
||||
from ocr.models import QueueDocument, DocumentQueue
|
||||
from ocr.conf.settings import NODE_CONCURRENT_EXECUTION
|
||||
from ocr.conf.settings import REPLICATION_DELAY
|
||||
from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
|
||||
from ocr.conf.settings import CACHE_URI
|
||||
from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
|
||||
|
||||
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
||||
# TODO: Tie LOCK_EXPIRATION with hard task timeout
|
||||
@@ -46,14 +46,14 @@ else:
|
||||
release_lock = lambda lock_id: True
|
||||
|
||||
|
||||
@task
|
||||
#@task
|
||||
def task_process_queue_document(queue_document_id):
|
||||
lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id)
|
||||
if acquire_lock(lock_id):
|
||||
queue_document = QueueDocument.objects.get(pk=queue_document_id)
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
|
||||
queue_document.node_name = platform.node()
|
||||
queue_document.result = task_process_queue_document.request.id
|
||||
#queue_document.result = task_process_queue_document.request.id
|
||||
queue_document.save()
|
||||
try:
|
||||
do_document_ocr(queue_document)
|
||||
@@ -66,6 +66,8 @@ def task_process_queue_document(queue_document_id):
|
||||
|
||||
|
||||
def reset_orphans():
|
||||
pass
|
||||
'''
|
||||
i = inspect().active()
|
||||
active_tasks = []
|
||||
orphans = []
|
||||
@@ -86,13 +88,12 @@ def reset_orphans():
|
||||
orphan.delay = False
|
||||
orphan.node_name = None
|
||||
orphan.save()
|
||||
'''
|
||||
|
||||
|
||||
|
||||
@periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL))
|
||||
def task_process_document_queues():
|
||||
if not cache_backend:
|
||||
random_delay()
|
||||
|
||||
# reset_orphans()
|
||||
# Causes problems with big clusters increased latency
|
||||
# Disabled until better solution
|
||||
@@ -110,6 +111,7 @@ def task_process_document_queues():
|
||||
|
||||
if oldest_queued_document_qs:
|
||||
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
|
||||
task_process_queue_document.delay(oldest_queued_document.pk)
|
||||
#task_process_queue_document.delay(oldest_queued_document.pk)
|
||||
task_process_queue_document(oldest_queued_document.pk)
|
||||
except Exception, e:
|
||||
print 'DocumentQueueWatcher exception: %s' % e
|
||||
|
||||
4
apps/scheduler/__init__.py
Normal file
4
apps/scheduler/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from apscheduler.scheduler import Scheduler
|
||||
|
||||
scheduler = Scheduler()
|
||||
scheduler.start()
|
||||
12
apps/scheduler/api.py
Normal file
12
apps/scheduler/api.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from scheduler import scheduler
|
||||
|
||||
registered_jobs = {}
|
||||
|
||||
|
||||
def register_interval_job(func, weeks=0, days=0, hours=0, minutes=0,
|
||||
seconds=0, start_date=None, args=None,
|
||||
kwargs=None, job_name=None, **options):
|
||||
|
||||
scheduler.add_interval_job(func=func, weeks=weeks, days=days,
|
||||
hours=hours, minutes=minutes, seconds=seconds,
|
||||
start_date=start_date, args=args, kwargs=kwargs)#, **options)
|
||||
3
apps/scheduler/models.py
Normal file
3
apps/scheduler/models.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from django.db import models
|
||||
|
||||
# Create your models here.
|
||||
23
apps/scheduler/tests.py
Normal file
23
apps/scheduler/tests.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""
|
||||
This file demonstrates two different styles of tests (one doctest and one
|
||||
unittest). These will both pass when you run "manage.py test".
|
||||
|
||||
Replace these with more appropriate tests for your application.
|
||||
"""
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
class SimpleTest(TestCase):
|
||||
def test_basic_addition(self):
|
||||
"""
|
||||
Tests that 1 + 1 always equals 2.
|
||||
"""
|
||||
self.failUnlessEqual(1 + 1, 2)
|
||||
|
||||
__test__ = {"doctest": """
|
||||
Another way to test that 1 + 1 is equal to 2.
|
||||
|
||||
>>> 1 + 1 == 2
|
||||
True
|
||||
"""}
|
||||
|
||||
1
apps/scheduler/views.py
Normal file
1
apps/scheduler/views.py
Normal file
@@ -0,0 +1 @@
|
||||
# Create your views here.
|
||||
@@ -14,3 +14,4 @@ slate==0.3
|
||||
PIL==1.1.7
|
||||
ghostscript==0.4.1
|
||||
pdfminer==20110227
|
||||
APScheduler==2.0.2
|
||||
|
||||
@@ -11,3 +11,4 @@ slate==0.3
|
||||
PIL==1.1.7
|
||||
ghostscript==0.4.1
|
||||
pdfminer==20110227
|
||||
APScheduler==2.0.2
|
||||
|
||||
@@ -151,6 +151,7 @@ INSTALLED_APPS = (
|
||||
'document_indexing',
|
||||
'sources',
|
||||
'mimetype',
|
||||
'scheduler',
|
||||
)
|
||||
|
||||
TEMPLATE_CONTEXT_PROCESSORS = (
|
||||
|
||||
Reference in New Issue
Block a user