diff --git a/apps/ocr/__init__.py b/apps/ocr/__init__.py index 1460f2f210..ea655b8296 100755 --- a/apps/ocr/__init__.py +++ b/apps/ocr/__init__.py @@ -11,9 +11,8 @@ from documents.models import Document from models import DocumentQueue from literals import QUEUEDOCUMENT_STATE_PROCESSING, \ - DOCUMENTQUEUE_STATE_STOPPED, QUEUEDOCUMENT_STATE_PENDING - -from api import start_queue_watcher + QUEUEDOCUMENT_STATE_PENDING, DOCUMENTQUEUE_STATE_STOPPED, \ + DOCUMENTQUEUE_STATE_ACTIVE #Permissions PERMISSION_OCR_DOCUMENT = 'ocr_document' @@ -39,9 +38,8 @@ try: default_queue.save() for queue in DocumentQueue.objects.all(): - queue.state = DOCUMENTQUEUE_STATE_STOPPED + queue.state = DOCUMENTQUEUE_STATE_ACTIVE queue.save() - start_queue_watcher(queue.name) for document in queue.queuedocument_set.filter(state=QUEUEDOCUMENT_STATE_PROCESSING): document.state = QUEUEDOCUMENT_STATE_PENDING document.save() diff --git a/apps/ocr/api.py b/apps/ocr/api.py index e215a245d2..c0c73306b7 100755 --- a/apps/ocr/api.py +++ b/apps/ocr/api.py @@ -1,9 +1,6 @@ #Some code from http://wiki.github.com/hoffstaetter/python-tesseract import os -from multiprocessing import Process, Queue -from Queue import Empty - import subprocess import tempfile @@ -18,12 +15,11 @@ from converter.api import convert_document_for_ocr from ocr.conf.settings import TESSERACT_PATH -from literals import QUEUEDOCUMENT_STATE_PROCESSING, \ - QUEUEDOCUMENT_STATE_ERROR, QUEUEDOCUMENT_STATE_PENDING +#from literals import QUEUEDOCUMENT_STATE_PROCESSING, \ +# QUEUEDOCUMENT_STATE_ERROR, QUEUEDOCUMENT_STATE_PENDING -from models import DocumentQueue +#from models import DocumentQueue -queue_dict = {} def cleanup(filename): ''' tries to remove the given filename. Ignores non-existent files ''' @@ -57,8 +53,6 @@ def run_tesseract(input_filename, output_filename_base, lang=None): return (proc.wait(), proc.stderr.read()) -#def do_document_ocr(document): - def do_document_ocr(document): for page_index, document_page in enumerate(document.documentpage_set.all()): imagefile = convert_document_for_ocr(document, page=page_index) @@ -73,8 +67,6 @@ def do_document_ocr(document): f = file(ocr_output) try: - #document_page, created = DocumentPage.objects.get_or_create(document=document, - # page_number=page_index+1) document_page = document.documentpage_set.get(page_number=page_index+1) document_page.content = f.read().strip() document_page.page_label = _(u'Text from OCR') @@ -84,63 +76,3 @@ def do_document_ocr(document): cleanup(filepath) cleanup(ocr_output) cleanup(imagefile) - - -def do_queue_document(queue_document): - print 'do_queue_document' - queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING - queue_document.save() - - try: - do_document_ocr(queue_document.document) - queue_document.delete() - print 'ocr ended ok' - - except Exception, e: - print 'error', e - queue_document.state = QUEUEDOCUMENT_STATE_ERROR - queue_document.result = e - queue_document.save() - - - -def process_queue_document(queue_document): - #print 'process_queued_document' - #print 'test' ,queue_document.document.documentpage_set.all() - #print 'after' - d=Document.objects.get(id=42) - print d - print d.documentpage_set.all() - print 'after' - - p = Process(target=do_queue_document, args=(queue_document,)) - p.start() - - -def start_queue_watcher(queue_name): - - if queue_name in queue_dict: - print 'already started' - else: - queue_dict[queue_name] = Queue() - print 'start', queue_name - # if queue_name in queue_dict: - document_queue = DocumentQueue.objects.get(name=queue_name) - watcher = Process(target=queue_watcher, args=(document_queue,)) - watcher.start() - # else: - # raise Exception('No such queue: %s' % queue_name) - -import time -import sys -def queue_watcher(document_queue): - while True: - time.sleep(5) - try: - oldest_queued_document = document_queue.queuedocument_set.filter( - state=QUEUEDOCUMENT_STATE_PENDING).order_by('datetime_submitted')[0] - process_queue_document(oldest_queued_document) - print 'queue.get', oldest_queued_document - sys.stdout.flush() - except: - pass diff --git a/apps/ocr/conf/settings.py b/apps/ocr/conf/settings.py index de3c3980da..ec1f102fbf 100755 --- a/apps/ocr/conf/settings.py +++ b/apps/ocr/conf/settings.py @@ -1,3 +1,4 @@ from django.conf import settings TESSERACT_PATH = getattr(settings, 'OCR_TESSERACT_PATH', u'/usr/bin/tesseract') +MAX_CONCURRENT_EXECUTION = getattr(settings, 'OCR_MAX_CONCURRENT_EXECUTION', 2) diff --git a/apps/ocr/models.py b/apps/ocr/models.py index 30e82591ee..03d9b837bb 100755 --- a/apps/ocr/models.py +++ b/apps/ocr/models.py @@ -8,9 +8,6 @@ from literals import DOCUMENTQUEUE_STATE_STOPPED,\ QUEUEDOCUMENT_STATE_CHOICES -#from api import queue_dict - - class DocumentQueue(models.Model): name = models.CharField(max_length=64, unique=True, verbose_name=_(u'name')) label = models.CharField(max_length=64, verbose_name=_(u'label')) diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py new file mode 100644 index 0000000000..cd324ad40c --- /dev/null +++ b/apps/ocr/tasks.py @@ -0,0 +1,50 @@ +from datetime import date, timedelta +from celery.task import Task, PeriodicTask +from celery.decorators import task + +from documents import Document + +from ocr.api import do_document_ocr +from literals import QUEUEDOCUMENT_STATE_PENDING, \ + QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE +from models import QueueDocument, DocumentQueue +from ocr.conf.settings import MAX_CONCURRENT_EXECUTION + +@task() +def do_document_ocr_task(document_id): + document = Document.objects.get(id=document_id) + do_document_ocr(document) + + +@task() +def do_queue_document(queue_document_id): + queue_document = QueueDocument.objects.get(id=queue_document_id) + queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING + queue_document.save() + try: + do_document_ocr(queue_document.document) + queue_document.delete() + except Exception, e: + queue_document.state = QUEUEDOCUMENT_STATE_ERROR + queue_document.result = e + queue_document.save() + + +class DocumentQueueWatcher(PeriodicTask): + run_every = timedelta(seconds=5) + + def run(self, **kwargs): + logger = self.get_logger(**kwargs) + logger.info('Running queue watcher.') + for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): + current_running_queues = QueueDocument.objects.filter(state=QUEUEDOCUMENT_STATE_PROCESSING).count() + if current_running_queues < MAX_CONCURRENT_EXECUTION: + try: + oldest_queued_document = document_queue.queuedocument_set.filter( + state=QUEUEDOCUMENT_STATE_PENDING).order_by('datetime_submitted')[0] + + do_queue_document(oldest_queued_document.id).delay() + except: + #No Documents in queue + pass + return True diff --git a/apps/ocr/views.py b/apps/ocr/views.py index 5f51841089..f39b902cc7 100755 --- a/apps/ocr/views.py +++ b/apps/ocr/views.py @@ -15,6 +15,7 @@ from ocr import PERMISSION_OCR_DOCUMENT from models import DocumentQueue, QueueDocument +from tasks import do_document_ocr_task def submit_document(request, document_id, queue_name='default'): permissions = [PERMISSION_OCR_DOCUMENT] @@ -26,9 +27,13 @@ def submit_document(request, document_id, queue_name='default'): document = get_object_or_404(Document, pk=document_id) document_queue = get_object_or_404(DocumentQueue, name=queue_name) - #document_queue.add_document(document) - queue_document = QueueDocument(document_queue=document_queue, document=document) - queue_document.save() + do_document_ocr_task.delay(document.id) + ##document_queue.add_document(document) + #queue_document = QueueDocument(document_queue=document_queue, document=document) + #queue_document.save() + + + #add.delay(1,2) messages.success(request, _(u'Document: %s was added to the OCR queue: %s.') % (document, document_queue.label)) return HttpResponseRedirect(request.META['HTTP_REFERER']) diff --git a/docs/CREDITS b/docs/CREDITS index e109ae11d3..e22124281a 100755 --- a/docs/CREDITS +++ b/docs/CREDITS @@ -75,3 +75,17 @@ Fancybox - FancyBox is a tool for displaying images, html content and unpaper - post-processing scanned and photocopied book pages Jens Gulden 2005-2007 - unpaper@jensgulden.de. http://unpaper.berlios.de/ + +celery - Celery is an open source asynchronous task queue/job queue + based on distributed message passing. It is focused on real-time + operation, but supports scheduling as well. + Copyright 2009-2011, Ask Solem & contributors + http://ask.github.com/celery/getting-started/introduction.html + +django-celery - django-celery provides Celery integration for Django; + Using the Django ORM and cache backend for storing + results, autodiscovery of task modules for applications + listed in INSTALLED_APPS, and more. + Copyright Ask Solem & contributors + http://github.com/ask/django-celery/ + diff --git a/docs/Changelog.txt b/docs/Changelog.txt index 7554a9b691..afe80c3085 100644 --- a/docs/Changelog.txt +++ b/docs/Changelog.txt @@ -12,3 +12,4 @@ * Added views to create, edit and grant/revoke permissions to roles * Apply default transformations to document before OCR * Added unpaper to the OCR convertion pipe +* Added support for concurrent, queued OCR processing using celery diff --git a/docs/TODO b/docs/TODO index 390faf48d7..52d43c5cc0 100755 --- a/docs/TODO +++ b/docs/TODO @@ -32,7 +32,11 @@ * DB stored transformations - DONE * Recognize multi-page documents - DONE * Add unpaper to pre OCR document cleanup - DONE +* Count pages in a PDF file http://pybrary.net/pyPdf/ - NOT NEEDED +* Support distributed OCR queues (RabbitMQ & Celery?) - DONE +* MuliThreading deferred OCR - DONE * Role editing view under setup - STARTED +* Scheduled maintenance (cleanup, deferred OCR's) - DONE * Document list filtering by metadata * Filterform date filtering widget * Validate GET data before saving file @@ -46,20 +50,15 @@ from a queryset * Allow metadata entry form to mix required and non required metadata * Link to delete and recreate all document links -* MuliThreading deferred OCR * Versioning support * Generic document anotations using layer overlays * Workflows -* Scheduled maintenance (cleanup, deferred OCR's) * Add tags to documents * Field for document language or autodetect -* Count pages in a PDF file http://pybrary.net/pyPdf/ -* Download a document in diffent formats: (jpg, png, pdf) * Download a document in diffent formats: (jpg, png, pdf) * Cache.cleanup function to delete cached images when document hash changes * Divide navigation links search by object and by view * Add show_summary method to model to display as results of a search -* Support distributed OCR queues (RabbitMQ & Celery?) * DXF viewer - http://code.google.com/p/dxf-reader/source/browse/#svn%2Ftrunk * Support spreadsheets, wordprocessing docs using openoffice in server mode * WebDAV support diff --git a/requirements/development.txt b/requirements/development.txt index bd77b9b189..4094f10871 100755 --- a/requirements/development.txt +++ b/requirements/development.txt @@ -5,3 +5,6 @@ django-extensions==0.6 django-pagination==1.0.7 django-rosetta==0.5.6 wsgiref==0.1.2 +celery==2.2.2 +django-celery==2.2.2 + diff --git a/requirements/production.txt b/requirements/production.txt index d8147de1ec..b58d327e2c 100755 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -2,3 +2,6 @@ Django==1.2.4 distribute==0.6.10 django-pagination==1.0.7 wsgiref==0.1.2 +celery==2.2.2 +django-celery==2.2.2 + diff --git a/settings.py b/settings.py index ba076de626..c218b573f9 100755 --- a/settings.py +++ b/settings.py @@ -127,6 +127,7 @@ INSTALLED_APPS = ( 'converter', 'ocr', 'permissions', + 'djcelery', ) TEMPLATE_CONTEXT_PROCESSORS = ( @@ -220,12 +221,22 @@ LOGIN_EXEMPT_URLS = ( # OCR #OCR_TESSERACT_PATH = u'/usr/bin/tesseract' +#OCR_MAX_CONCURRENT_EXECUTION = 2 # Permissions #ROLES_DEFAULT_ROLES = [] # Override SEARCH_SHOW_OBJECT_TYPE = False +#----------- django-celery -------------- +import djcelery +djcelery.setup_loader() +BROKER_HOST = "localhost" +BROKER_PORT = 5672 +BROKER_USER = "guest" +BROKER_PASSWORD = "guest" +BROKER_VHOST = "/" +CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler' #======== End of configuration options ======= try: diff --git a/wsgi/dispatch.wsgi b/wsgi/dispatch.wsgi index c9fb781b92..1c9b4fc9fc 100755 --- a/wsgi/dispatch.wsgi +++ b/wsgi/dispatch.wsgi @@ -16,6 +16,7 @@ sys.path.insert(0, ve_path) # Avoid ``[Errno 13] Permission denied: '/var/www/.python-eggs'`` messages os.environ['PYTHON_EGG_CACHE'] = '/tmp' os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' +os.environ['CELERY_LOADER'] = 'django' from django.core.handlers.wsgi import WSGIHandler application = WSGIHandler()