diff --git a/apps/bootstrap/api.py b/apps/bootstrap/api.py index 9f9f37fd7c..5d04156f71 100644 --- a/apps/bootstrap/api.py +++ b/apps/bootstrap/api.py @@ -9,13 +9,13 @@ from documents.models import DocumentType, DocumentTypeFilename, Document from metadata.models import MetadataType, MetadataSet from document_indexing.models import Index, IndexTemplateNode from sources.models import WebForm, StagingFolder -from ocr.models import QueueDocument, QueueTransformation, DocumentQueue from history.models import History from taggit.models import Tag from tags.models import TagProperties from folders.models import Folder from dynamic_search.models import RecentSearch from django_gpg.runtime import gpg +# TODO: clear the job queues bootstrap_options = {} @@ -63,18 +63,6 @@ def nuke_database(): for obj in Role.objects.all(): obj.delete() - # Delete all document in the ocr queue - for obj in QueueDocument.objects.all(): - obj.delete() - - # Delete all the transformations for a queue - for obj in QueueTransformation.objects.all(): - obj.delete() - - # Delete all the ocr document queues - for obj in DocumentQueue.objects.all(): - obj.delete() - # Delete all the remaining history events for obj in History.objects.all(): obj.delete() diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index e69de29bb2..0d06afc514 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from scheduler.api import register_interval_job + +from .tasks import refresh_node, job_queue_poll + +NODE_REFRESH_INTERVAL = 1 +JOB_QUEUE_POLL_INTERVAL = 1 + +register_interval_job('refresh_node', _(u'Update a node\'s properties.'), refresh_node, seconds=NODE_REFRESH_INTERVAL) +register_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) diff --git a/apps/job_processor/admin.py b/apps/job_processor/admin.py new file mode 100644 index 0000000000..29d0535763 --- /dev/null +++ b/apps/job_processor/admin.py @@ -0,0 +1,34 @@ +from __future__ import absolute_import + +from django.contrib import admin +from django.utils.translation import ugettext_lazy as _ + +from .models import Node, JobQueue, JobQueueItem, Worker + + +class WorkerInline(admin.StackedInline): + list_display = ('name', 'creation_datetime', 'state') + model = Worker + + +class NodeAdmin(admin.ModelAdmin): + list_display = ('hostname', 'cpuload', 'heartbeat', 'memory_usage') + inlines = [WorkerInline] + + +class JobQueueItemInline(admin.StackedInline): + model = JobQueueItem + + +class JobQueueAdmin(admin.ModelAdmin): + model = JobQueue + list_display = ('name', 'label', 'total_items') + inlines = [JobQueueItemInline] + + def total_items(self, obj): + return obj.items.all().count() + total_items.short_description = _(u'total items') + + +admin.site.register(Node, NodeAdmin) +admin.site.register(JobQueue, JobQueueAdmin) diff --git a/apps/job_processor/api.py b/apps/job_processor/api.py deleted file mode 100644 index 00b9736fef..0000000000 --- a/apps/job_processor/api.py +++ /dev/null @@ -1,2 +0,0 @@ -def process_job(func, *args, **kwargs): - return func(*args, **kwargs) diff --git a/apps/job_processor/exceptions.py b/apps/job_processor/exceptions.py new file mode 100644 index 0000000000..bac36e7b37 --- /dev/null +++ b/apps/job_processor/exceptions.py @@ -0,0 +1,14 @@ +#class WorkerAlreadyDisabled(Exception): +# pass + + +#class WorkerAlreadyEnabled(Exception): +# pass + + +class JobQueuePushError(Exception): + pass + + +class JobQueueNoPendingJobs(Exception): + pass diff --git a/apps/job_processor/literals.py b/apps/job_processor/literals.py new file mode 100644 index 0000000000..a8a1bdf7aa --- /dev/null +++ b/apps/job_processor/literals.py @@ -0,0 +1,19 @@ +from django.utils.translation import ugettext_lazy as _ + +WORKER_STATE_RUNNING = 'r' +WORKER_STATE_DEAD = 'd' + +WORKER_STATE_CHOICES = ( + (WORKER_STATE_RUNNING, _(u'running')), + (WORKER_STATE_DEAD, _(u'dead')), +) + +JOB_STATE_PENDING = 'p' +JOB_STATE_PROCESSING = 'r' +JOB_STATE_ERROR = 'e' + +JOB_STATE_CHOICES = ( + (JOB_STATE_PENDING, _(u'pending')), + (JOB_STATE_PROCESSING, _(u'processing')), + (JOB_STATE_ERROR, _(u'error')), +) diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 71a8362390..90859397a6 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -1,3 +1,239 @@ -from django.db import models +from __future__ import absolute_import -# Create your models here. +import os +import datetime +import uuid +import hashlib +import platform +from multiprocessing import Process + +from django.db import models, IntegrityError, transaction +from django.db import close_connection +from django.contrib.contenttypes import generic +from django.utils.translation import ugettext_lazy as _ +from django.utils.translation import ugettext +from django.utils.simplejson import loads, dumps + +from common.models import Singleton +from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, + JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES, + WORKER_STATE_RUNNING) +from .exceptions import JobQueuePushError, JobQueueNoPendingJobs +#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled) + +job_queue_labels = {} +job_types_registry = {} + + +class Job(object): + def __init__(self, function, job_queue_item): + close_connection() + # Run sync or launch async subprocess + # OR launch 2 processes: monitor & actual process + node = Node.objects.get_myself() + worker = Worker.objects.create(node=node, name=u'%s-%d' % (node.hostname, os.getpid())) + try: + close_connection() + transaction.commit_on_success(function)(**loads(job_queue_item.kwargs)) + #function(**loads(job_queue_item.kwargs)) + except Exception, exc: + close_connection() + transaction.rollback() + close_connection() + def set_state_error(): + job_queue_item.result = exc + job_queue_item.state = JOB_STATE_ERROR + job_queue_item.save() + transaction.commit_on_success(set_state_error)() + else: + job_queue_item.delete() + finally: + worker.delete() + + +class JobType(object): + def __init__(self, name, label, function): + self.name = name + self.label = label + self.function = function + job_types_registry[self.name] = self + + def run(self, job_queue_item, **kwargs): + job_queue_item.state = JOB_STATE_PROCESSING + job_queue_item.save() + p = Process(target=Job, args=(self.function, job_queue_item,)) + p.start() + #p.join() + + +class NodeManager(models.Manager): + def get_myself(self): + return self.model.objects.get(hostname=platform.node()) + + +class Node(models.Model): + hostname = models.CharField(max_length=255, verbose_name=_(u'hostname')) + cpuload = models.PositiveIntegerField(blank=True, default=0, verbose_name=_(u'cpu load')) + heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check')) + memory_usage = models.FloatField(blank=True, verbose_name=_(u'memory usage')) + + objects = NodeManager() + + def __unicode__(self): + return self.hostname + + def save(self, *args, **kwargs): + self.heartbeat = datetime.datetime.now() + return super(Node, self).save(*args, **kwargs) + + class Meta: + verbose_name = _(u'node') + verbose_name_plural = _(u'nodes') + + +class JobQueueManager(models.Manager): + def get_or_create(self, *args, **kwargs): + job_queue_labels[kwargs.get('name')] = kwargs.get('defaults', {}).get('label') + return super(JobQueueManager, self).get_or_create(*args, **kwargs) + + +class JobQueue(models.Model): + # TODO: support for stopping and starting job queues + # Internal name + name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True) + unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True) + + objects = JobQueueManager() + + def __unicode__(self): + return unicode(self.label) or self.names + + @property + def label(self): + return job_queue_labels.get(self.name) + + def push(self, job_type, **kwargs): # TODO: add replace flag + job_queue_item = JobQueueItem(job_queue=self, job_type=job_type.name, kwargs=dumps(kwargs)) + job_queue_item.save() + return job_queue_item + + #def pull(self): + # queue_item_qs = JobQueueItem.objects.filter(queue=self).order_by('-creation_datetime') + # if queue_item_qs: + # queue_item = queue_item_qs[0] + # queue_item.delete() + # return loads(queue_item.data) + + def get_oldest_pending_job(self): + try: + return self.pending_jobs.all().order_by('-creation_datetime')[0] + except IndexError: + raise JobQueueNoPendingJobs + + @property + def pending_jobs(self): + return self.items.filter(state=JOB_STATE_PENDING) + + @property + def items(self): + return self.jobqueueitem_set + + def empty(self): + self.items.all().delete() + + def save(self, *args, **kwargs): + label = getattr(self, 'label', None) + if label: + job_queue_labels[self.name] = label + return super(JobQueue, self).save(*args, **kwargs) + + # TODO: custom runtime methods + + class Meta: + verbose_name = _(u'job queue') + verbose_name_plural = _(u'job queues') + + +class JobQueueItem(models.Model): + # TODO: add re-queue + job_queue = models.ForeignKey(JobQueue, verbose_name=_(u'job queue')) + creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), editable=False) + unique_id = models.CharField(blank=True, max_length=64, verbose_name=_(u'id'), unique=True, editable=False) + job_type = models.CharField(max_length=32, verbose_name=_(u'job type')) + kwargs = models.TextField(verbose_name=_(u'keyword arguments')) + state = models.CharField(max_length=4, + choices=JOB_STATE_CHOICES, + default=JOB_STATE_PENDING, + verbose_name=_(u'state')) + result = models.TextField(blank=True, verbose_name=_(u'result')) + + def __unicode__(self): + return self.unique_id + + def save(self, *args, **kwargs): + self.creation_datetime = datetime.datetime.now() + + if self.job_queue.unique_jobs: + self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() + else: + self.unique_id = unicode(uuid.uuid4()) + try: + super(JobQueueItem, self).save(*args, **kwargs) + except IntegrityError: + # TODO: Maybe replace instead of rasining exception w/ replace flag + raise JobQueuePushError + + def run(self): + job_type_instance = job_types_registry.get(self.job_type) + job_type_instance.run(self) + + class Meta: + ordering = ('creation_datetime',) + verbose_name = _(u'job queue item') + verbose_name_plural = _(u'job queue items') + + +class Worker(models.Model): + node = models.ForeignKey(Node, verbose_name=_(u'node')) + name = models.CharField(max_length=255, verbose_name=_(u'name')) + creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False) + heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check')) + state = models.CharField(max_length=4, + choices=WORKER_STATE_CHOICES, + default=WORKER_STATE_RUNNING, + verbose_name=_(u'state')) + + #def disable(self): + # if self.state == WORKER_STATE_DISABLED: + # raise WorkerAlreadyDisabled + # + # self.state = WORKER_STATE_DISABLED + # self.save() + # + #def enable(self): + # if self.state == WORKER_STATE_ENABLED: + # raise WorkerAlreadyEnabled + # + # self.state = WORKER_STATE_ENABLED + # self.save() + # + #def is_enabled(self): + # return self.state == WORKER_STATE_ENABLED + + class Meta: + ordering = ('creation_datetime',) + verbose_name = _(u'worker') + verbose_name_plural = _(u'workers') + +""" +class JobProcessingConfig(Singleton): + worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead + worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') + node_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') + + def __unicode__(self): + return ugettext('Workers configuration') + + class Meta: + verbose_name = verbose_name_plural = _(u'Workers configuration') +""" diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py new file mode 100644 index 0000000000..7117c49a53 --- /dev/null +++ b/apps/job_processor/tasks.py @@ -0,0 +1,58 @@ +from __future__ import absolute_import + +from datetime import timedelta, datetime +import platform +import logging +import psutil + +from lock_manager import Lock, LockError + +from .models import Node, JobQueue +from .exceptions import JobQueueNoPendingJobs + +LOCK_EXPIRE = 10 +# TODO: Tie LOCK_EXPIRATION with hard task timeout + +logger = logging.getLogger(__name__) + + +def refresh_node(): + logger.debug('starting') + + lock_id = u'refresh_node' + try: + logger.debug('trying to acquire lock: %s' % lock_id) + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + logger.debug('acquired lock: %s' % lock_id) + node, created = Node.objects.get_or_create(hostname=platform.node(), defaults={'memory_usage': 0.0}) + node.cpuload = psutil.cpu_percent() + node.memory_usage = psutil.phymem_usage().percent + node.save() + lock.release() + except LockError: + logger.debug('unable to obtain lock') + except Exception: + lock.release() + raise + + +def job_queue_poll(): + logger.debug('starting') + + lock_id = u'job_queue_poll' + try: + logger.debug('trying to acquire lock: %s' % lock_id) + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + logger.debug('acquired lock: %s' % lock_id) + for job_queue in JobQueue.objects.all(): + try: + job_item = job_queue.get_oldest_pending_job() + job_item.run() + except JobQueueNoPendingJobs: + logger.debug('no pending jobs for job queue: %s' % job_queue) + lock.release() + except LockError: + logger.debug('unable to obtain lock') + except Exception: + lock.release() + raise diff --git a/apps/lock_manager/models.py b/apps/lock_manager/models.py index a70af8230c..89b49e5881 100644 --- a/apps/lock_manager/models.py +++ b/apps/lock_manager/models.py @@ -2,7 +2,8 @@ from __future__ import absolute_import import datetime -from django.db import models +from django.db import close_connection +from django.db import models, transaction from django.utils.translation import ugettext_lazy as _ from .managers import LockManager @@ -26,13 +27,17 @@ class Lock(models.Model): super(Lock, self).save(*args, **kwargs) + @transaction.commit_on_success def release(self): + close_connection() try: lock = Lock.objects.get(name=self.name, creation_datetime=self.creation_datetime) lock.delete() except Lock.DoesNotExist: # Out lock expired and was reassigned pass + except DatabaseError: + transaction.rollback() class Meta: verbose_name = _(u'lock') diff --git a/apps/ocr/__init__.py b/apps/ocr/__init__.py index f824a23e9d..7ea9921e14 100644 --- a/apps/ocr/__init__.py +++ b/apps/ocr/__init__.py @@ -17,11 +17,11 @@ from project_tools.api import register_tool from acls.api import class_permissions from scheduler.api import register_interval_job from statistics.api import register_statistics -from queue_manager.models import Queue +from job_processor.models import JobQueue, JobType from .conf.settings import (AUTOMATIC_OCR, QUEUE_PROCESSING_INTERVAL) from .models import OCRProcessingSingleton -from .tasks import task_process_document_queues +from .api import do_document_ocr from .permissions import PERMISSION_OCR_DOCUMENT from .exceptions import AlreadyQueued from . import models as ocr_models @@ -29,6 +29,7 @@ from .statistics import get_statistics from .literals import OCR_QUEUE_NAME logger = logging.getLogger(__name__) +ocr_job_queue = None from .links import (submit_document, re_queue_multiple_document, queue_document_multiple_delete, ocr_disable, @@ -37,21 +38,17 @@ from .links import (submit_document, re_queue_multiple_document, bind_links([Document], [submit_document]) bind_links([OCRProcessingSingleton], [ocr_disable, ocr_enable]) -#bind_links([QueueTransformation], [setup_queue_transformation_edit, setup_queue_transformation_delete]) - #register_multi_item_links(['queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete]) -#bind_links(['setup_queue_transformation_create', 'setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'document_queue_disable', 'document_queue_enable', 'queue_document_list', 'setup_queue_transformation_list'], [queue_document_list], menu_name='secondary_menu') -#bind_links(['setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'setup_queue_transformation_list', 'setup_queue_transformation_create'], [setup_queue_transformation_create], menu_name='sidebar') - register_maintenance_links([all_document_ocr_cleanup], namespace='ocr', title=_(u'OCR')) -#register_multi_item_links(['folder_view', 'search', 'results', 'index_instance_node_view', 'document_find_duplicates', 'document_type_document_list', 'document_group_view', 'document_list', 'document_list_recent'], [submit_document_multiple]) +register_multi_item_links(['folder_view', 'search', 'results', 'index_instance_node_view', 'document_find_duplicates', 'document_type_document_list', 'document_group_view', 'document_list', 'document_list_recent'], [submit_document_multiple]) @transaction.commit_on_success -def create_ocr_queue(): +def create_ocr_job_queue(): + global ocr_job_queue try: - queue, created = Queue.objects.get_or_create(name=OCR_QUEUE_NAME, defaults={'label': _('OCR'), 'unique_names': True}) + ocr_job_queue, created = JobQueue.objects.get_or_create(name=OCR_QUEUE_NAME, defaults={'label': _('OCR'), 'unique_jobs': True}) except DatabaseError: transaction.rollback() @@ -76,12 +73,6 @@ def document_post_save(sender, instance, **kwargs): # logger.debug('got call_queue signal: %s' % kwargs) # task_process_document_queues() - -#@receiver(post_syncdb, dispatch_uid='create_ocr_queue_on_syncdb', sender=ocr_models) -#def create_ocr_queue_on_syncdb(sender, **kwargs): - -#register_interval_job('task_process_document_queues', _(u'Checks the OCR queue for pending documents.'), task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL) - register_tool(ocr_tool_link) class_permissions(Document, [ @@ -89,4 +80,5 @@ class_permissions(Document, [ ]) #register_statistics(get_statistics) -create_ocr_queue() +create_ocr_job_queue() +ocr_job_type = JobType('ocr', _(u'OCR'), do_document_ocr) diff --git a/apps/ocr/admin.py b/apps/ocr/admin.py deleted file mode 100644 index 1689ad7a52..0000000000 --- a/apps/ocr/admin.py +++ /dev/null @@ -1,22 +0,0 @@ -""" -from __future__ import absolute_import - -from django.contrib import admin - -from .models import DocumentQueue, QueueDocument - - -class QueueDocumentInline(admin.StackedInline): - model = QueueDocument - extra = 1 - classes = ('collapse-open',) - allow_add = True - - -class DocumentQueueAdmin(admin.ModelAdmin): - inlines = [QueueDocumentInline] - list_display = ('name', 'label', 'state') - - -admin.site.register(DocumentQueue, DocumentQueueAdmin) -""" diff --git a/apps/ocr/api.py b/apps/ocr/api.py index 2cc3fad6f4..5af659a4b3 100644 --- a/apps/ocr/api.py +++ b/apps/ocr/api.py @@ -12,7 +12,7 @@ from django.utils.importlib import import_module from common.conf.settings import TEMPORARY_DIRECTORY from converter.api import convert -from documents.models import DocumentPage +from documents.models import DocumentPage, DocumentVersion from .conf.settings import (TESSERACT_PATH, TESSERACT_LANGUAGE, UNPAPER_PATH) from .exceptions import TesseractError, UnpaperError @@ -81,25 +81,25 @@ def run_tesseract(input_filename, lang=None): return text -def do_document_ocr(queue_document): +def do_document_ocr(document_version_pk): """ Try first to extract text from document pages using the registered parser, if the parser fails or if there is no parser registered for the document mimetype do a visual OCR by calling tesseract """ - for document_page in queue_document.document_version.pages.all(): + document_version = DocumentVersion.objects.get(pk=document_version_pk) + for document_page in document_version.pages.all(): try: # Try to extract text by means of a parser parse_document_page(document_page) except (ParserError, ParserUnknownFile): # Fall back to doing visual OCR - ocr_transformations, warnings = queue_document.get_transformation_list() document_filepath = document_page.document.get_image_cache_name(page=document_page.page_number, version=document_page.document_version.pk) unpaper_output_filename = u'%s_unpaper_out_page_%s%s%s' % (document_page.document.uuid, document_page.page_number, os.extsep, UNPAPER_FILE_FORMAT) unpaper_output_filepath = os.path.join(TEMPORARY_DIRECTORY, unpaper_output_filename) - unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT, transformations=ocr_transformations) + unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT) execute_unpaper(input_filepath=unpaper_input, output_filepath=unpaper_output_filepath) #from PIL import Image, ImageOps diff --git a/apps/ocr/forms.py b/apps/ocr/forms.py deleted file mode 100644 index 0fde716bbb..0000000000 --- a/apps/ocr/forms.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -from __future__ import absolute_import - -from django import forms - -from .models import QueueTransformation - - -class QueueTransformationForm(forms.ModelForm): - class Meta: - model = QueueTransformation - - def __init__(self, *args, **kwargs): - super(QueueTransformationForm, self).__init__(*args, **kwargs) - self.fields['content_type'].widget = forms.HiddenInput() - self.fields['object_id'].widget = forms.HiddenInput() - - -class QueueTransformationForm_create(forms.ModelForm): - class Meta: - model = QueueTransformation - exclude = ('content_type', 'object_id') -""" diff --git a/apps/ocr/links.py b/apps/ocr/links.py index 5c708ae4b2..84a01e8238 100644 --- a/apps/ocr/links.py +++ b/apps/ocr/links.py @@ -30,8 +30,3 @@ queue_document_multiple_delete = Link(text=_(u'delete'), view='queue_document_mu all_document_ocr_cleanup = Link(text=_(u'clean up pages content'), view='all_document_ocr_cleanup', sprite='text_strikethrough', permissions=[PERMISSION_OCR_CLEAN_ALL_PAGES], description=_(u'Runs a language filter to remove common OCR mistakes from document pages content.')) ocr_tool_link = Link(text=_(u'OCR'), view='ocr_log', sprite='hourglass', icon='text.png', permissions=[PERMISSION_OCR_DOCUMENT]) # children_view_regex=[r'queue_', r'document_queue']) - -#setup_queue_transformation_list = Link(text=_(u'transformations'), view='setup_queue_transformation_list', args='queue.pk', sprite='shape_move_front') -#setup_queue_transformation_create = Link(text=_(u'add transformation'), view='setup_queue_transformation_create', args='queue.pk', sprite='shape_square_add') -#setup_queue_transformation_edit = Link(text=_(u'edit'), view='setup_queue_transformation_edit', args='transformation.pk', sprite='shape_square_edit') -#setup_queue_transformation_delete = Link(text=_(u'delete'), view='setup_queue_transformation_delete', args='transformation.pk', sprite='shape_square_delete') diff --git a/apps/ocr/literals.py b/apps/ocr/literals.py index 761cd017d5..b7d10f8615 100644 --- a/apps/ocr/literals.py +++ b/apps/ocr/literals.py @@ -9,17 +9,6 @@ OCR_STATE_CHOICES = ( (OCR_STATE_ENABLED, _(u'enabled')), ) - -#QUEUEDOCUMENT_STATE_PENDING = 'p' -#QUEUEDOCUMENT_STATE_PROCESSING = 'i' -#QUEUEDOCUMENT_STATE_ERROR = 'e' - -#QUEUEDOCUMENT_STATE_CHOICES = ( -# (QUEUEDOCUMENT_STATE_PENDING, _(u'pending')), -# (QUEUEDOCUMENT_STATE_PROCESSING, _(u'processing')), -# (QUEUEDOCUMENT_STATE_ERROR, _(u'error')), -#) - DEFAULT_OCR_FILE_FORMAT = u'tiff' DEFAULT_OCR_FILE_EXTENSION = u'tif' UNPAPER_FILE_FORMAT = u'ppm' diff --git a/apps/ocr/models.py b/apps/ocr/models.py index 3cecf15951..9898db7060 100644 --- a/apps/ocr/models.py +++ b/apps/ocr/models.py @@ -53,146 +53,3 @@ class OCRProcessingSingleton(Singleton): class Meta: verbose_name = verbose_name_plural = _(u'OCR processing properties') - -""" -class OCRLog(models.Model): - #queue = models.ForeignKey(Queue, verbose_name=_(u'queue')) - document_version = models.ForeignKey(DocumentVersion, verbose_name=_(u'document version')) - datetime = models.DateTimeField(verbose_name=_(u'date time'), default=lambda: datetime.datetime.now(), db_index=True) - delay = models.BooleanField(verbose_name=_(u'delay OCR'), default=False) - #state = models.CharField(max_length=4, - # choices=QUEUEDOCUMENT_STATE_CHOICES, - # default=QUEUEDOCUMENT_STATE_PENDING, - # verbose_name=_(u'state')) - result = models.TextField(blank=True, null=True, verbose_name=_(u'result')) - #node_name = models.CharField(max_length=32, verbose_name=_(u'node name'), blank=True, null=True) - - class Meta: - ordering = ('datetime',) - verbose_name = _(u'OCR log entry') - verbose_name_plural = _(u'OCR log entries') - - #def get_transformation_list(self): - # return QueueTransformation.transformations.get_for_object_as_list(self) - - def requeue(self): - pass - #if self.state == QUEUEDOCUMENT_STATE_PROCESSING: - # raise ReQueueError - #else: - # self.datetime_submitted = datetime.now() - # self.state = QUEUEDOCUMENT_STATE_PENDING - # self.delay = False - # self.result = None - # self.node_name = None - # self.save() - - def __unicode__(self): - try: - return unicode(self.document) - except ObjectDoesNotExist: - return ugettext(u'Missing document.') -""" - -#class DocumentQueue(models.Model): -# name = models.CharField(max_length=64, unique=True, verbose_name=_(u'name')) -# label = models.CharField(max_length=64, verbose_name=_(u'label')) -# state = models.CharField(max_length=4, -# choices=DOCUMENTQUEUE_STATE_CHOICES, -# default=DOCUMENTQUEUE_STATE_ACTIVE, -# verbose_name=_(u'state')) -# -# objects = DocumentQueueManager()# -# -# class Meta: -# verbose_name = _(u'document queue') -# verbose_name_plural = _(u'document queues')# -# -# def __unicode__(self): -# return self.label - - - -""" -class QueueDocument(models.Model): - document_queue = models.ForeignKey(DocumentQueue, verbose_name=_(u'document queue')) - document = models.ForeignKey(Document, verbose_name=_(u'document')) - datetime_submitted = models.DateTimeField(verbose_name=_(u'date time submitted'), auto_now_add=True, db_index=True) - delay = models.BooleanField(verbose_name=_(u'delay ocr'), default=False) - state = models.CharField(max_length=4, - choices=QUEUEDOCUMENT_STATE_CHOICES, - default=QUEUEDOCUMENT_STATE_PENDING, - verbose_name=_(u'state')) - result = models.TextField(blank=True, null=True, verbose_name=_(u'result')) - node_name = models.CharField(max_length=32, verbose_name=_(u'node name'), blank=True, null=True) - - class Meta: - ordering = ('datetime_submitted',) - verbose_name = _(u'queue document') - verbose_name_plural = _(u'queue documents') - - def get_transformation_list(self): - return QueueTransformation.transformations.get_for_object_as_list(self) - - def requeue(self): - if self.state == QUEUEDOCUMENT_STATE_PROCESSING: - raise ReQueueError - else: - self.datetime_submitted = datetime.now() - self.state = QUEUEDOCUMENT_STATE_PENDING - self.delay = False - self.result = None - self.node_name = None - self.save() - - def __unicode__(self): - try: - return unicode(self.document) - except ObjectDoesNotExist: - return ugettext(u'Missing document.') - - -class ArgumentsValidator(object): - message = _(u'Enter a valid value.') - code = 'invalid' - - def __init__(self, message=None, code=None): - if message is not None: - self.message = message - if code is not None: - self.code = code - - def __call__(self, value): - ''' - Validates that the input evaluates correctly. - ''' - value = value.strip() - try: - literal_eval(value) - except (ValueError, SyntaxError): - raise ValidationError(self.message, code=self.code) - - -class QueueTransformation(models.Model): - ''' - Model that stores the transformation and transformation arguments - for a given document queue - ''' - content_type = models.ForeignKey(ContentType) - object_id = models.PositiveIntegerField() - content_object = generic.GenericForeignKey('content_type', 'object_id') - order = models.PositiveIntegerField(default=0, blank=True, null=True, verbose_name=_(u'order'), db_index=True) - transformation = models.CharField(choices=get_available_transformations_choices(), max_length=128, verbose_name=_(u'transformation')) - arguments = models.TextField(blank=True, null=True, verbose_name=_(u'arguments'), help_text=_(u'Use dictionaries to indentify arguments, example: %s') % u'{\'degrees\':90}', validators=[ArgumentsValidator()]) - - objects = models.Manager() - transformations = SourceTransformationManager() - - def __unicode__(self): - return self.get_transformation_display() - - class Meta: - ordering = ('order',) - verbose_name = _(u'document queue transformation') - verbose_name_plural = _(u'document queue transformations') -""" diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py deleted file mode 100644 index 9780c8b2a8..0000000000 --- a/apps/ocr/tasks.py +++ /dev/null @@ -1,75 +0,0 @@ -from __future__ import absolute_import - -from datetime import timedelta, datetime -import platform -import logging - -from django.db.models import Q - -from job_processor.api import process_job -from lock_manager import Lock, LockError - -from .api import do_document_ocr -#from .literals import (QUEUEDOCUMENT_STATE_PENDING, -# QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE, -# QUEUEDOCUMENT_STATE_ERROR) -#from .models import QueueDocument, DocumentQueue -from .conf.settings import NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY - -LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes -# TODO: Tie LOCK_EXPIRATION with hard task timeout - -logger = logging.getLogger(__name__) - - -def task_process_queue_document(queue_document_id): - lock_id = u'task_proc_queue_doc-%d' % queue_document_id - try: - logger.debug('trying to acquire lock: %s' % lock_id) - lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) - logger.debug('acquired lock: %s' % lock_id) - queue_document = QueueDocument.objects.get(pk=queue_document_id) - queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING - queue_document.node_name = platform.node() - queue_document.save() - try: - do_document_ocr(queue_document) - queue_document.delete() - except Exception, e: - queue_document.state = QUEUEDOCUMENT_STATE_ERROR - queue_document.result = e - queue_document.save() - - lock.release() - except LockError: - logger.debug('unable to obtain lock') - pass - - -def task_process_document_queues(): - logger.debug('executed') - # TODO: reset_orphans() - q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) - q_delayed = Q(delay=True) - q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) - for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): - current_local_processing_count = QueueDocument.objects.filter( - state=QUEUEDOCUMENT_STATE_PROCESSING).filter( - node_name=platform.node()).count() - if current_local_processing_count < NODE_CONCURRENT_EXECUTION: - try: - oldest_queued_document_qs = document_queue.queuedocument_set.filter( - (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - - if oldest_queued_document_qs: - oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - process_job(task_process_queue_document, oldest_queued_document.pk) - except Exception, e: - logger.error('unhandled exception: %s' % e) - finally: - # Don't process anymore from this queryset, might be stale - break - else: - logger.debug('already processing maximun') - else: - logger.debug('nothing to process') diff --git a/apps/ocr/views.py b/apps/ocr/views.py index d4dcb109d6..1c019be7ac 100644 --- a/apps/ocr/views.py +++ b/apps/ocr/views.py @@ -14,17 +14,16 @@ from documents.models import Document from documents.widgets import document_link, document_thumbnail from common.utils import encapsulate from acls.models import AccessEntry +from job_processor.exceptions import JobQueuePushError from .permissions import (PERMISSION_OCR_DOCUMENT, PERMISSION_OCR_DOCUMENT_DELETE, PERMISSION_OCR_QUEUE_ENABLE_DISABLE, PERMISSION_OCR_CLEAN_ALL_PAGES, PERMISSION_OCR_QUEUE_EDIT) from .models import OCRProcessingSingleton -#from .literals import (QUEUEDOCUMENT_STATE_PROCESSING, -# DOCUMENTQUEUE_STATE_ACTIVE, DOCUMENTQUEUE_STATE_STOPPED) from .exceptions import (AlreadyQueued, ReQueueError, OCRProcessingAlreadyDisabled, OCRProcessingAlreadyEnabled) from .api import clean_pages -#from .forms import QueueTransformationForm, QueueTransformationForm_create +from . import ocr_job_queue, ocr_job_type def ocr_log(request): @@ -195,15 +194,15 @@ def submit_document(request, document_id): def submit_document_to_queue(request, document, post_submit_redirect=None): - ''' + """ This view is meant to be reusable - ''' + """ try: - document_queue = DocumentQueue.objects.queue_document(document) - messages.success(request, _(u'Document: %(document)s was added to the OCR queue: %(queue)s.') % { - 'document': document, 'queue': document_queue.label}) - except AlreadyQueued: + ocr_job_queue.push(ocr_job_type, document_version_pk=document.latest_version.pk) + messages.success(request, _(u'Document: %(document)s was added to the OCR queue sucessfully.') % { + 'document': document}) + except JobQueuePushError: messages.warning(request, _(u'Document: %(document)s is already queued.') % { 'document': document}) except Exception, e: diff --git a/apps/queue_manager/__init__.py b/apps/queue_manager/__init__.py deleted file mode 100755 index 5b40c4e8ec..0000000000 --- a/apps/queue_manager/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -from queue_manager.models import Queue as QueueModel, QueuePushError - -class Queue(object): - @classmethod - def __new__(cls, name, queue_name, label=None, unique_names=False): - name = queue_name - if not label: - label=u'' - queue, created = QueueModel.objects.get_or_create( - name=name, - defaults={ - 'label': label, - 'unique_names': unique_names - } - ) - if not created: - queue.label = label - queue.unique_names = unique_names - queue.save() - return queue diff --git a/apps/queue_manager/admin.py b/apps/queue_manager/admin.py deleted file mode 100755 index 14bb1dcb74..0000000000 --- a/apps/queue_manager/admin.py +++ /dev/null @@ -1,22 +0,0 @@ -from django.contrib import admin - -from django.utils.translation import ugettext_lazy as _ - -from queue_manager.models import Queue, QueueItem - - -class QueueItemInline(admin.StackedInline): - model = QueueItem - - -class QueueAdmin(admin.ModelAdmin): - model = Queue - list_display = ('name', 'label', 'total_items') - inlines = [QueueItemInline] - - def total_items(self, obj): - return obj.items.all().count() - total_items.short_description = _(u'total items') - - -admin.site.register(Queue, QueueAdmin) diff --git a/apps/queue_manager/exceptions.py b/apps/queue_manager/exceptions.py deleted file mode 100644 index fc356e3b69..0000000000 --- a/apps/queue_manager/exceptions.py +++ /dev/null @@ -1,6 +0,0 @@ -class QueueException(Exception): - pass - - -class QueuePushError(QueueException): - pass diff --git a/apps/queue_manager/migrations/0001_initial.py b/apps/queue_manager/migrations/0001_initial.py deleted file mode 100644 index 22215e726d..0000000000 --- a/apps/queue_manager/migrations/0001_initial.py +++ /dev/null @@ -1,57 +0,0 @@ -# -*- coding: utf-8 -*- -import datetime -from south.db import db -from south.v2 import SchemaMigration -from django.db import models - - -class Migration(SchemaMigration): - - def forwards(self, orm): - # Adding model 'Queue' - db.create_table('queue_manager_queue', ( - ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), - ('name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=32)), - ('unique_names', self.gf('django.db.models.fields.BooleanField')(default=False)), - )) - db.send_create_signal('queue_manager', ['Queue']) - - # Adding model 'QueueItem' - db.create_table('queue_manager_queueitem', ( - ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), - ('queue', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['queue_manager.Queue'])), - ('creation_datetime', self.gf('django.db.models.fields.DateTimeField')()), - ('unique_name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=32, blank=True)), - ('name', self.gf('django.db.models.fields.CharField')(max_length=32, blank=True)), - ('data', self.gf('django.db.models.fields.TextField')()), - )) - db.send_create_signal('queue_manager', ['QueueItem']) - - - def backwards(self, orm): - # Deleting model 'Queue' - db.delete_table('queue_manager_queue') - - # Deleting model 'QueueItem' - db.delete_table('queue_manager_queueitem') - - - models = { - 'queue_manager.queue': { - 'Meta': {'object_name': 'Queue'}, - 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), - 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), - 'unique_names': ('django.db.models.fields.BooleanField', [], {'default': 'False'}) - }, - 'queue_manager.queueitem': { - 'Meta': {'object_name': 'QueueItem'}, - 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), - 'data': ('django.db.models.fields.TextField', [], {}), - 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), - 'name': ('django.db.models.fields.CharField', [], {'max_length': '32', 'blank': 'True'}), - 'queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['queue_manager.Queue']"}), - 'unique_name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32', 'blank': 'True'}) - } - } - - complete_apps = ['queue_manager'] \ No newline at end of file diff --git a/apps/queue_manager/migrations/__init__.py b/apps/queue_manager/migrations/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/apps/queue_manager/models.py b/apps/queue_manager/models.py deleted file mode 100755 index e82781723b..0000000000 --- a/apps/queue_manager/models.py +++ /dev/null @@ -1,95 +0,0 @@ -from __future__ import absolute_import - -from datetime import datetime - -from django.db import models -from django.utils.translation import ugettext_lazy as _ -from django.utils.simplejson import loads, dumps -from django.db import IntegrityError - -from .exceptions import QueuePushError - -queue_labels = {} - - -class QueueManager(models.Manager): - def get_or_create(self, *args, **kwargs): - queue_labels[kwargs.get('name')] = kwargs.get('defaults', {}).get('label') - return super(QueueManager, self).get_or_create(*args, **kwargs) - - -class Queue(models.Model): - # Internal name - name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True) - unique_names = models.BooleanField(verbose_name=_(u'unique names'), default=False) - - objects = QueueManager() - - def __unicode__(self): - return unicode(self.label) or self.name - - @property - def label(self): - return queue_labels.get(self.name) - - def push(self, data, name=None): # TODO: add replace flag - if not name: - name = u'' - queue_item = QueueItem(queue=self, name=name, data=dumps(data)) - queue_item.save() - return queue_item - - def pull(self): - queue_item_qs = QueueItem.objects.filter(queue=self).order_by('-creation_datetime') - if queue_item_qs: - queue_item = queue_item_qs[0] - queue_item.delete() - return loads(queue_item.data) - - @property - def items(self): - return self.queueitem_set - - def empty(self): - self.items.all().delete() - - def save(self, *args, **kwargs): - label = getattr(self, 'label', None) - if label: - queue_labels[self.name] = label - return super(Queue, self).save(*args, **kwargs) - - # TODO: custom runtime methods - - class Meta: - verbose_name = _(u'queue') - verbose_name_plural = _(u'queues') - - -class QueueItem(models.Model): - queue = models.ForeignKey(Queue, verbose_name=_(u'queue')) - creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), editable=False) - unique_name = models.CharField(blank=True, max_length=32, verbose_name=_(u'name'), unique=True, editable=False) - name = models.CharField(blank=True, max_length=32, verbose_name=_(u'name')) - data = models.TextField(verbose_name=_(u'data')) - - def __unicode__(self): - return self.name - - def save(self, *args, **kwargs): - self.creation_datetime = datetime.now() - - if self.queue.unique_names: - self.unique_name = self.name - else: - self.unique_name = unicode(self.creation_datetime) - try: - super(QueueItem, self).save(*args, **kwargs) - except IntegrityError: - # TODO: Maybe replace instead or rasining exception w/ replace flag - raise QueuePushError - - class Meta: - verbose_name = _(u'queue item') - verbose_name_plural = _(u'queue items') - diff --git a/apps/queue_manager/views.py b/apps/queue_manager/views.py deleted file mode 100755 index 60f00ef0ef..0000000000 --- a/apps/queue_manager/views.py +++ /dev/null @@ -1 +0,0 @@ -# Create your views here. diff --git a/apps/scheduler/__init__.py b/apps/scheduler/__init__.py index 18d7ea6e9f..e9e693b024 100644 --- a/apps/scheduler/__init__.py +++ b/apps/scheduler/__init__.py @@ -17,6 +17,9 @@ from .links import job_list logger = logging.getLogger(__name__) + +# TODO: shutdown scheduler on pre_syncdb to avoid accessing non existing models + @receiver(post_syncdb, dispatch_uid='scheduler_shutdown_post_syncdb') def scheduler_shutdown_post_syncdb(sender, **kwargs): logger.debug('Scheduler shut down on post syncdb signal') diff --git a/settings.py b/settings.py index e9d25d00c7..c627ac33f6 100644 --- a/settings.py +++ b/settings.py @@ -154,7 +154,6 @@ INSTALLED_APPS = ( 'navigation', 'lock_manager', 'web_theme', - 'queue_manager', # pagination needs to go after web_theme so that the pagination template is found 'pagination', 'common', @@ -188,7 +187,7 @@ INSTALLED_APPS = ( 'workflows', 'checkouts', 'rest_api', - 'bootstrap', + #'bootstrap', 'statistics', # Has to be last so the other apps can register it's signals