Changed from python's multiprocessing to celery to handle concurrency

This commit is contained in:
Roberto Rosario
2011-02-17 03:45:30 -04:00
parent 409a52af95
commit 478fb3502e
13 changed files with 102 additions and 87 deletions

View File

@@ -11,9 +11,8 @@ from documents.models import Document
from models import DocumentQueue
from literals import QUEUEDOCUMENT_STATE_PROCESSING, \
DOCUMENTQUEUE_STATE_STOPPED, QUEUEDOCUMENT_STATE_PENDING
from api import start_queue_watcher
QUEUEDOCUMENT_STATE_PENDING, DOCUMENTQUEUE_STATE_STOPPED, \
DOCUMENTQUEUE_STATE_ACTIVE
#Permissions
PERMISSION_OCR_DOCUMENT = 'ocr_document'
@@ -39,9 +38,8 @@ try:
default_queue.save()
for queue in DocumentQueue.objects.all():
queue.state = DOCUMENTQUEUE_STATE_STOPPED
queue.state = DOCUMENTQUEUE_STATE_ACTIVE
queue.save()
start_queue_watcher(queue.name)
for document in queue.queuedocument_set.filter(state=QUEUEDOCUMENT_STATE_PROCESSING):
document.state = QUEUEDOCUMENT_STATE_PENDING
document.save()

View File

@@ -1,9 +1,6 @@
#Some code from http://wiki.github.com/hoffstaetter/python-tesseract
import os
from multiprocessing import Process, Queue
from Queue import Empty
import subprocess
import tempfile
@@ -18,12 +15,11 @@ from converter.api import convert_document_for_ocr
from ocr.conf.settings import TESSERACT_PATH
from literals import QUEUEDOCUMENT_STATE_PROCESSING, \
QUEUEDOCUMENT_STATE_ERROR, QUEUEDOCUMENT_STATE_PENDING
#from literals import QUEUEDOCUMENT_STATE_PROCESSING, \
# QUEUEDOCUMENT_STATE_ERROR, QUEUEDOCUMENT_STATE_PENDING
from models import DocumentQueue
#from models import DocumentQueue
queue_dict = {}
def cleanup(filename):
''' tries to remove the given filename. Ignores non-existent files '''
@@ -57,8 +53,6 @@ def run_tesseract(input_filename, output_filename_base, lang=None):
return (proc.wait(), proc.stderr.read())
#def do_document_ocr(document):
def do_document_ocr(document):
for page_index, document_page in enumerate(document.documentpage_set.all()):
imagefile = convert_document_for_ocr(document, page=page_index)
@@ -73,8 +67,6 @@ def do_document_ocr(document):
f = file(ocr_output)
try:
#document_page, created = DocumentPage.objects.get_or_create(document=document,
# page_number=page_index+1)
document_page = document.documentpage_set.get(page_number=page_index+1)
document_page.content = f.read().strip()
document_page.page_label = _(u'Text from OCR')
@@ -84,63 +76,3 @@ def do_document_ocr(document):
cleanup(filepath)
cleanup(ocr_output)
cleanup(imagefile)
def do_queue_document(queue_document):
print 'do_queue_document'
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.save()
try:
do_document_ocr(queue_document.document)
queue_document.delete()
print 'ocr ended ok'
except Exception, e:
print 'error', e
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e
queue_document.save()
def process_queue_document(queue_document):
#print 'process_queued_document'
#print 'test' ,queue_document.document.documentpage_set.all()
#print 'after'
d=Document.objects.get(id=42)
print d
print d.documentpage_set.all()
print 'after'
p = Process(target=do_queue_document, args=(queue_document,))
p.start()
def start_queue_watcher(queue_name):
if queue_name in queue_dict:
print 'already started'
else:
queue_dict[queue_name] = Queue()
print 'start', queue_name
# if queue_name in queue_dict:
document_queue = DocumentQueue.objects.get(name=queue_name)
watcher = Process(target=queue_watcher, args=(document_queue,))
watcher.start()
# else:
# raise Exception('No such queue: %s' % queue_name)
import time
import sys
def queue_watcher(document_queue):
while True:
time.sleep(5)
try:
oldest_queued_document = document_queue.queuedocument_set.filter(
state=QUEUEDOCUMENT_STATE_PENDING).order_by('datetime_submitted')[0]
process_queue_document(oldest_queued_document)
print 'queue.get', oldest_queued_document
sys.stdout.flush()
except:
pass

View File

@@ -1,3 +1,4 @@
from django.conf import settings
TESSERACT_PATH = getattr(settings, 'OCR_TESSERACT_PATH', u'/usr/bin/tesseract')
MAX_CONCURRENT_EXECUTION = getattr(settings, 'OCR_MAX_CONCURRENT_EXECUTION', 2)

View File

@@ -8,9 +8,6 @@ from literals import DOCUMENTQUEUE_STATE_STOPPED,\
QUEUEDOCUMENT_STATE_CHOICES
#from api import queue_dict
class DocumentQueue(models.Model):
name = models.CharField(max_length=64, unique=True, verbose_name=_(u'name'))
label = models.CharField(max_length=64, verbose_name=_(u'label'))

50
apps/ocr/tasks.py Normal file
View File

@@ -0,0 +1,50 @@
from datetime import date, timedelta
from celery.task import Task, PeriodicTask
from celery.decorators import task
from documents import Document
from ocr.api import do_document_ocr
from literals import QUEUEDOCUMENT_STATE_PENDING, \
QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE
from models import QueueDocument, DocumentQueue
from ocr.conf.settings import MAX_CONCURRENT_EXECUTION
@task()
def do_document_ocr_task(document_id):
document = Document.objects.get(id=document_id)
do_document_ocr(document)
@task()
def do_queue_document(queue_document_id):
queue_document = QueueDocument.objects.get(id=queue_document_id)
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.save()
try:
do_document_ocr(queue_document.document)
queue_document.delete()
except Exception, e:
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e
queue_document.save()
class DocumentQueueWatcher(PeriodicTask):
run_every = timedelta(seconds=5)
def run(self, **kwargs):
logger = self.get_logger(**kwargs)
logger.info('Running queue watcher.')
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
current_running_queues = QueueDocument.objects.filter(state=QUEUEDOCUMENT_STATE_PROCESSING).count()
if current_running_queues < MAX_CONCURRENT_EXECUTION:
try:
oldest_queued_document = document_queue.queuedocument_set.filter(
state=QUEUEDOCUMENT_STATE_PENDING).order_by('datetime_submitted')[0]
do_queue_document(oldest_queued_document.id).delay()
except:
#No Documents in queue
pass
return True

View File

@@ -15,6 +15,7 @@ from ocr import PERMISSION_OCR_DOCUMENT
from models import DocumentQueue, QueueDocument
from tasks import do_document_ocr_task
def submit_document(request, document_id, queue_name='default'):
permissions = [PERMISSION_OCR_DOCUMENT]
@@ -26,9 +27,13 @@ def submit_document(request, document_id, queue_name='default'):
document = get_object_or_404(Document, pk=document_id)
document_queue = get_object_or_404(DocumentQueue, name=queue_name)
#document_queue.add_document(document)
queue_document = QueueDocument(document_queue=document_queue, document=document)
queue_document.save()
do_document_ocr_task.delay(document.id)
##document_queue.add_document(document)
#queue_document = QueueDocument(document_queue=document_queue, document=document)
#queue_document.save()
#add.delay(1,2)
messages.success(request, _(u'Document: %s was added to the OCR queue: %s.') % (document, document_queue.label))
return HttpResponseRedirect(request.META['HTTP_REFERER'])

View File

@@ -75,3 +75,17 @@ Fancybox - FancyBox is a tool for displaying images, html content and
unpaper - post-processing scanned and photocopied book pages
Jens Gulden 2005-2007 - unpaper@jensgulden.de.
http://unpaper.berlios.de/
celery - Celery is an open source asynchronous task queue/job queue
based on distributed message passing. It is focused on real-time
operation, but supports scheduling as well.
Copyright 2009-2011, Ask Solem & contributors
http://ask.github.com/celery/getting-started/introduction.html
django-celery - django-celery provides Celery integration for Django;
Using the Django ORM and cache backend for storing
results, autodiscovery of task modules for applications
listed in INSTALLED_APPS, and more.
Copyright Ask Solem & contributors
http://github.com/ask/django-celery/

View File

@@ -12,3 +12,4 @@
* Added views to create, edit and grant/revoke permissions to roles
* Apply default transformations to document before OCR
* Added unpaper to the OCR convertion pipe
* Added support for concurrent, queued OCR processing using celery

View File

@@ -32,7 +32,11 @@
* DB stored transformations - DONE
* Recognize multi-page documents - DONE
* Add unpaper to pre OCR document cleanup - DONE
* Count pages in a PDF file http://pybrary.net/pyPdf/ - NOT NEEDED
* Support distributed OCR queues (RabbitMQ & Celery?) - DONE
* MuliThreading deferred OCR - DONE
* Role editing view under setup - STARTED
* Scheduled maintenance (cleanup, deferred OCR's) - DONE
* Document list filtering by metadata
* Filterform date filtering widget
* Validate GET data before saving file
@@ -46,20 +50,15 @@
from a queryset
* Allow metadata entry form to mix required and non required metadata
* Link to delete and recreate all document links
* MuliThreading deferred OCR
* Versioning support
* Generic document anotations using layer overlays
* Workflows
* Scheduled maintenance (cleanup, deferred OCR's)
* Add tags to documents
* Field for document language or autodetect
* Count pages in a PDF file http://pybrary.net/pyPdf/
* Download a document in diffent formats: (jpg, png, pdf)
* Download a document in diffent formats: (jpg, png, pdf)
* Cache.cleanup function to delete cached images when document hash changes
* Divide navigation links search by object and by view
* Add show_summary method to model to display as results of a search
* Support distributed OCR queues (RabbitMQ & Celery?)
* DXF viewer - http://code.google.com/p/dxf-reader/source/browse/#svn%2Ftrunk
* Support spreadsheets, wordprocessing docs using openoffice in server mode
* WebDAV support

View File

@@ -5,3 +5,6 @@ django-extensions==0.6
django-pagination==1.0.7
django-rosetta==0.5.6
wsgiref==0.1.2
celery==2.2.2
django-celery==2.2.2

View File

@@ -2,3 +2,6 @@ Django==1.2.4
distribute==0.6.10
django-pagination==1.0.7
wsgiref==0.1.2
celery==2.2.2
django-celery==2.2.2

View File

@@ -127,6 +127,7 @@ INSTALLED_APPS = (
'converter',
'ocr',
'permissions',
'djcelery',
)
TEMPLATE_CONTEXT_PROCESSORS = (
@@ -220,12 +221,22 @@ LOGIN_EXEMPT_URLS = (
# OCR
#OCR_TESSERACT_PATH = u'/usr/bin/tesseract'
#OCR_MAX_CONCURRENT_EXECUTION = 2
# Permissions
#ROLES_DEFAULT_ROLES = []
# Override
SEARCH_SHOW_OBJECT_TYPE = False
#----------- django-celery --------------
import djcelery
djcelery.setup_loader()
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
#======== End of configuration options =======
try:

View File

@@ -16,6 +16,7 @@ sys.path.insert(0, ve_path)
# Avoid ``[Errno 13] Permission denied: '/var/www/.python-eggs'`` messages
os.environ['PYTHON_EGG_CACHE'] = '/tmp'
os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
os.environ['CELERY_LOADER'] = 'django'
from django.core.handlers.wsgi import WSGIHandler
application = WSGIHandler()