diff --git a/.gitignore b/.gitignore index 6a625abb94..c517550e4d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,15 +1,15 @@ *.orig *.pyc *.pyo -site_media/photologue/photos/* -site_media/photologue/photos/cache/* -*.sqlite -settings_local.py -site_media/documents/* -celerybeat-schedule -document_storage/ -misc/mayan.geany -image_cache/ +/*.sqlite +/settings_local.py +/celerybeat-schedule +/document_storage/ +/misc/mayan.geany +/image_cache/ build/ _build/ -gpg_home/ +/gpg_home/ +/static/ +/whoosh_index/ +/fabfile_install diff --git a/apps/bootstrap/api.py b/apps/bootstrap/api.py index 9f9f37fd7c..5d04156f71 100644 --- a/apps/bootstrap/api.py +++ b/apps/bootstrap/api.py @@ -9,13 +9,13 @@ from documents.models import DocumentType, DocumentTypeFilename, Document from metadata.models import MetadataType, MetadataSet from document_indexing.models import Index, IndexTemplateNode from sources.models import WebForm, StagingFolder -from ocr.models import QueueDocument, QueueTransformation, DocumentQueue from history.models import History from taggit.models import Tag from tags.models import TagProperties from folders.models import Folder from dynamic_search.models import RecentSearch from django_gpg.runtime import gpg +# TODO: clear the job queues bootstrap_options = {} @@ -63,18 +63,6 @@ def nuke_database(): for obj in Role.objects.all(): obj.delete() - # Delete all document in the ocr queue - for obj in QueueDocument.objects.all(): - obj.delete() - - # Delete all the transformations for a queue - for obj in QueueTransformation.objects.all(): - obj.delete() - - # Delete all the ocr document queues - for obj in DocumentQueue.objects.all(): - obj.delete() - # Delete all the remaining history events for obj in History.objects.all(): obj.delete() diff --git a/apps/checkouts/__init__.py b/apps/checkouts/__init__.py index f176cfa01a..1991c69cf1 100644 --- a/apps/checkouts/__init__.py +++ b/apps/checkouts/__init__.py @@ -3,7 +3,7 @@ from __future__ import absolute_import from django.utils.translation import ugettext_lazy as _ from navigation.api import bind_links, register_top_menu -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from documents.models import Document from acls.api import class_permissions @@ -14,6 +14,7 @@ from .permissions import (PERMISSION_DOCUMENT_CHECKOUT, from .links import checkout_list, checkout_document, checkout_info, checkin_document from .models import DocumentCheckout from .tasks import task_check_expired_check_outs +from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL def initialize_document_checkout_extra_methods(): @@ -34,6 +35,8 @@ class_permissions(Document, [ PERMISSION_DOCUMENT_RESTRICTIONS_OVERRIDE ]) -CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed -register_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL) +checkouts_scheduler = LocalScheduler('checkouts', _(u'Document checkouts')) +checkouts_scheduler.add_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL) +checkouts_scheduler.start() + initialize_document_checkout_extra_methods() diff --git a/apps/checkouts/literals.py b/apps/checkouts/literals.py index 23e9920984..22ac3c1279 100644 --- a/apps/checkouts/literals.py +++ b/apps/checkouts/literals.py @@ -14,3 +14,5 @@ STATE_LABELS = { STATE_CHECKED_OUT: _(u'checked out'), STATE_CHECKED_IN: _(u'checked in/available'), } + +CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed diff --git a/apps/clustering/__init__.py b/apps/clustering/__init__.py new file mode 100644 index 0000000000..66a8549c26 --- /dev/null +++ b/apps/clustering/__init__.py @@ -0,0 +1,19 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from scheduler.api import LocalScheduler +from navigation.api import bind_links +from project_tools.api import register_tool + +from .tasks import node_heartbeat, house_keeping +from .links import tool_link, node_list +from .models import Node, ClusteringConfig + +clustering_scheduler = LocalScheduler('clustering', _(u'Clustering')) +clustering_scheduler.add_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval) +clustering_scheduler.add_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1) +clustering_scheduler.start() + +register_tool(tool_link) +bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') diff --git a/apps/clustering/admin.py b/apps/clustering/admin.py new file mode 100644 index 0000000000..ad92b0a549 --- /dev/null +++ b/apps/clustering/admin.py @@ -0,0 +1,13 @@ +from __future__ import absolute_import + +from django.contrib import admin +from django.utils.translation import ugettext_lazy as _ + +from .models import Node + + +class NodeAdmin(admin.ModelAdmin): + list_display = ('hostname', 'cpuload', 'heartbeat', 'memory_usage') + + +admin.site.register(Node, NodeAdmin) diff --git a/apps/clustering/links.py b/apps/clustering/links.py new file mode 100644 index 0000000000..606103a6de --- /dev/null +++ b/apps/clustering/links.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from navigation.api import Link + +from .permissions import (PERMISSION_NODES_VIEW) + +tool_link = Link(text=_(u'clustering'), view='node_list', icon='server.png', permissions=[PERMISSION_NODES_VIEW]) # children_view_regex=[r'^index_setup', r'^template_node']) +node_list = Link(text=_(u'node list'), view='node_list', sprite='server', permissions=[PERMISSION_NODES_VIEW]) diff --git a/apps/clustering/migrations/0001_initial.py b/apps/clustering/migrations/0001_initial.py new file mode 100644 index 0000000000..7932f6ade5 --- /dev/null +++ b/apps/clustering/migrations/0001_initial.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'Node' + db.create_table('clustering_node', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)), + ('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)), + ('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)), + )) + db.send_create_signal('clustering', ['Node']) + + + def backwards(self, orm): + # Deleting model 'Node' + db.delete_table('clustering_node') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + } + } + + complete_apps = ['clustering'] \ No newline at end of file diff --git a/apps/clustering/migrations/0002_auto__add_clusteringconfig.py b/apps/clustering/migrations/0002_auto__add_clusteringconfig.py new file mode 100644 index 0000000000..26acb2d393 --- /dev/null +++ b/apps/clustering/migrations/0002_auto__add_clusteringconfig.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'ClusteringConfig' + db.create_table('clustering_clusteringconfig', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)), + ('node_time_to_live', self.gf('django.db.models.fields.PositiveIntegerField')()), + ('node_heartbeat_interval', self.gf('django.db.models.fields.PositiveIntegerField')()), + )) + db.send_create_signal('clustering', ['ClusteringConfig']) + + + def backwards(self, orm): + # Deleting model 'ClusteringConfig' + db.delete_table('clustering_clusteringconfig') + + + models = { + 'clustering.clusteringconfig': { + 'Meta': {'object_name': 'ClusteringConfig'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}), + 'node_heartbeat_interval': ('django.db.models.fields.PositiveIntegerField', [], {}), + 'node_time_to_live': ('django.db.models.fields.PositiveIntegerField', [], {}) + }, + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + } + } + + complete_apps = ['clustering'] \ No newline at end of file diff --git a/apps/clustering/migrations/0003_auto__chg_field_node_cpuload.py b/apps/clustering/migrations/0003_auto__chg_field_node_cpuload.py new file mode 100644 index 0000000000..7cfbc5b1c3 --- /dev/null +++ b/apps/clustering/migrations/0003_auto__chg_field_node_cpuload.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + + # Changing field 'Node.cpuload' + db.alter_column('clustering_node', 'cpuload', self.gf('django.db.models.fields.FloatField')()) + + def backwards(self, orm): + + # Changing field 'Node.cpuload' + db.alter_column('clustering_node', 'cpuload', self.gf('django.db.models.fields.PositiveIntegerField')()) + + models = { + 'clustering.clusteringconfig': { + 'Meta': {'object_name': 'ClusteringConfig'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}), + 'node_heartbeat_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '1'}), + 'node_time_to_live': ('django.db.models.fields.PositiveIntegerField', [], {'default': '5'}) + }, + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}) + } + } + + complete_apps = ['clustering'] \ No newline at end of file diff --git a/apps/clustering/migrations/__init__.py b/apps/clustering/migrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/clustering/models.py b/apps/clustering/models.py new file mode 100644 index 0000000000..b083829f74 --- /dev/null +++ b/apps/clustering/models.py @@ -0,0 +1,89 @@ +from __future__ import absolute_import + +import os +import datetime +import platform + +import psutil + +from django.db import models, IntegrityError, transaction +from django.db import close_connection +from django.utils.translation import ugettext_lazy as _ +from django.utils.translation import ugettext + +from common.models import Singleton + +DEFAULT_NODE_TTL = 5 +DEFAULT_NODE_HEARTBEAT_INTERVAL = 1 + + +class NodeManager(models.Manager): + def myself(self): + node, created = self.model.objects.get_or_create(hostname=platform.node()) + node.refresh() + if created: + # Store the refresh data because is a new instance + node.save() + return node + + +class Node(models.Model): + hostname = models.CharField(max_length=255, verbose_name=_(u'hostname')) + cpuload = models.FloatField(blank=True, default=0.0, verbose_name=_(u'cpu load')) + heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check')) + memory_usage = models.FloatField(blank=True, default=0.0, verbose_name=_(u'memory usage')) + + objects = NodeManager() + + @classmethod + def platform_info(cls): + return { + 'cpuload': psutil.cpu_percent(), + 'memory_usage': psutil.phymem_usage().percent + } + + def __unicode__(self): + return self.hostname + + def refresh(self): + if self.hostname == platform.node(): + # Make we can only update ourselves + info = Node.platform_info() + self.cpuload = info['cpuload'] + self.memory_usage = info['memory_usage'] + + def save(self, *args, **kwargs): + self.heartbeat = datetime.datetime.now() + return super(Node, self).save(*args, **kwargs) + + class Meta: + verbose_name = _(u'node') + verbose_name_plural = _(u'nodes') + + +class ClusteringConfigManager(models.Manager): + def dead_nodes(self): + return Node.objects.filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_time_to_live)) + + def delete_dead_nodes(self): + self.dead_nodes().delete() + + def zombiest_node(self): + try: + return self.dead_nodes().order_by('-heartbeat')[0] + except IndexError: + return None + + +class ClusteringConfig(Singleton): + node_time_to_live = models.PositiveIntegerField(verbose_name=(u'time to live (in seconds)'), default=DEFAULT_NODE_TTL) # After this time a worker is considered dead + node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'heartbeat interval'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL) + # TODO: add validation, interval cannot be greater than TTL + + objects = ClusteringConfigManager() + + def __unicode__(self): + return ugettext('clustering config') + + class Meta: + verbose_name = verbose_name_plural = _(u'clustering config') diff --git a/apps/clustering/permissions.py b/apps/clustering/permissions.py new file mode 100644 index 0000000000..6065936140 --- /dev/null +++ b/apps/clustering/permissions.py @@ -0,0 +1,8 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from permissions.models import PermissionNamespace, Permission + +namespace = PermissionNamespace('clustering', _(u'Clustering')) +PERMISSION_NODES_VIEW = Permission.objects.register(namespace, 'nodes_view', _(u'View the nodes in a Mayan cluster')) diff --git a/apps/clustering/static/images/icons/server.png b/apps/clustering/static/images/icons/server.png new file mode 100644 index 0000000000..5b1fb2d7e9 Binary files /dev/null and b/apps/clustering/static/images/icons/server.png differ diff --git a/apps/clustering/tasks.py b/apps/clustering/tasks.py new file mode 100644 index 0000000000..c5938047d9 --- /dev/null +++ b/apps/clustering/tasks.py @@ -0,0 +1,25 @@ +from __future__ import absolute_import + +import logging + +from lock_manager.decorators import simple_locking + +from .models import Node, ClusteringConfig + +LOCK_EXPIRE = 10 + +logger = logging.getLogger(__name__) + + +@simple_locking('node_heartbeat', 10) +def node_heartbeat(): + logger.debug('starting') + node = Node.objects.myself() + node.save() + + +@simple_locking('house_keeping', 10) +def house_keeping(): + logger.debug('starting') + ClusteringConfig.objects.delete_dead_nodes() + diff --git a/apps/clustering/urls.py b/apps/clustering/urls.py new file mode 100644 index 0000000000..e43cf0041d --- /dev/null +++ b/apps/clustering/urls.py @@ -0,0 +1,6 @@ +from django.conf.urls.defaults import patterns, url + + +urlpatterns = patterns('clustering.views', + url(r'^node/list/$', 'node_list', (), 'node_list'), +) diff --git a/apps/clustering/views.py b/apps/clustering/views.py new file mode 100644 index 0000000000..e00f7b4da2 --- /dev/null +++ b/apps/clustering/views.py @@ -0,0 +1,67 @@ +from __future__ import absolute_import + +from django.shortcuts import render_to_response +from django.template import RequestContext +from django.utils.translation import ugettext_lazy as _ +from django.shortcuts import get_object_or_404 +from django.db.models.loading import get_model +from django.http import Http404 +from django.core.exceptions import PermissionDenied + +from permissions.models import Permission +from common.utils import encapsulate +from acls.models import AccessEntry + +from .models import Node +from .permissions import PERMISSION_NODES_VIEW + + +def node_list(request): + Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW]) + + context = { + 'object_list': Node.objects.all(), + 'title': _(u'nodes'), + 'extra_columns_preffixed': [ + { + 'name': _(u'hostname'), + 'attribute': 'hostname', + }, + { + 'name': _(u'cpu load'), + 'attribute': 'cpuload', + }, + { + 'name': _(u'heartbeat'), + 'attribute': 'heartbeat', + }, + { + 'name': _(u'memory usage'), + 'attribute': 'memory_usage', + }, + + ], + 'hide_object': True, + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + +def node_workers(request, node_pk): + node = get_object_or_404(Node, pk=node_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_NODES_VIEW, request.user, node) + + context = { + 'object_list': node.workers.all(), + 'title': _(u'workers for node: %s') % node, + 'object': node, + 'hide_object': True, + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) diff --git a/apps/common/models.py b/apps/common/models.py index d0b0f8414d..dc7be8316e 100644 --- a/apps/common/models.py +++ b/apps/common/models.py @@ -17,7 +17,11 @@ class Singleton(models.Model): lock_id = models.CharField(max_length=1, default=SINGLETON_LOCK_ID, editable=False, verbose_name=_(u'lock field'), unique=True) objects = SingletonManager() - + + @classmethod + def get(cls): + return cls.objects.get() + def save(self, *args, **kwargs): self.id = 1 super(Singleton, self).save(*args, **kwargs) diff --git a/apps/dynamic_search/__init__.py b/apps/dynamic_search/__init__.py index 55718876be..ae5422c437 100644 --- a/apps/dynamic_search/__init__.py +++ b/apps/dynamic_search/__init__.py @@ -9,9 +9,8 @@ from django.core.management import call_command from navigation.api import register_sidebar_template, bind_links, Link from documents.models import Document -from scheduler.runtime import scheduler +from scheduler.api import LocalScheduler from signaler.signals import post_update_index, pre_update_index -from scheduler.api import register_interval_job from lock_manager import Lock, LockError from .models import IndexableObject @@ -36,7 +35,7 @@ def scheduler_shutdown_pre_update_index(sender, mayan_runtime, **kwargs): # Only shutdown the scheduler if the command is called from the command # line if not mayan_runtime: - scheduler.shutdown() + LocalScheduler.shutdown_all() def search_index_update(): @@ -61,4 +60,6 @@ def search_index_update(): bind_links(['search', 'search_advanced', 'results'], [search], menu_name='form_header') bind_links(['results'], [search_again], menu_name='sidebar') -register_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL) +dynamic_search_scheduler = LocalScheduler('search', _(u'Search')) +dynamic_search_scheduler.add_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL) +dynamic_search_scheduler.start() diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index e69de29bb2..66fb010bff 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from scheduler.api import LocalScheduler +from navigation.api import bind_links, register_model_list_columns +from project_tools.api import register_tool +from common.utils import encapsulate + +from clustering.models import Node + +from .models import JobQueue +from .tasks import job_queue_poll +from .links import (node_workers, job_queues, tool_link, + job_queue_items_pending, job_queue_items_error, job_queue_items_active) + +#TODO: fix this, make it cluster wide +JOB_QUEUE_POLL_INTERVAL = 1 + +job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) +job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) +job_processor_scheduler.start() + +register_tool(tool_link) +bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') +bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error]) + +bind_links([Node], [node_workers]) + +Node.add_to_class('workers', lambda node: node.worker_set) + +register_model_list_columns(Node, [ + { + 'name': _(u'active workers'), + 'attribute': encapsulate(lambda x: x.workers().all().count()) + }, +]) diff --git a/apps/job_processor/admin.py b/apps/job_processor/admin.py new file mode 100644 index 0000000000..4739a8df26 --- /dev/null +++ b/apps/job_processor/admin.py @@ -0,0 +1,23 @@ +from __future__ import absolute_import + +from django.contrib import admin +from django.utils.translation import ugettext_lazy as _ + +from .models import JobQueue, JobQueueItem + + +class JobQueueItemInline(admin.StackedInline): + model = JobQueueItem + + +class JobQueueAdmin(admin.ModelAdmin): + model = JobQueue + list_display = ('name', 'label', 'total_items') + inlines = [JobQueueItemInline] + + def total_items(self, obj): + return obj.items.all().count() + total_items.short_description = _(u'total items') + + +admin.site.register(JobQueue, JobQueueAdmin) diff --git a/apps/job_processor/api.py b/apps/job_processor/api.py deleted file mode 100644 index 00b9736fef..0000000000 --- a/apps/job_processor/api.py +++ /dev/null @@ -1,2 +0,0 @@ -def process_job(func, *args, **kwargs): - return func(*args, **kwargs) diff --git a/apps/job_processor/exceptions.py b/apps/job_processor/exceptions.py new file mode 100644 index 0000000000..bac36e7b37 --- /dev/null +++ b/apps/job_processor/exceptions.py @@ -0,0 +1,14 @@ +#class WorkerAlreadyDisabled(Exception): +# pass + + +#class WorkerAlreadyEnabled(Exception): +# pass + + +class JobQueuePushError(Exception): + pass + + +class JobQueueNoPendingJobs(Exception): + pass diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py new file mode 100644 index 0000000000..a3fffee82b --- /dev/null +++ b/apps/job_processor/links.py @@ -0,0 +1,18 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from navigation.api import Link + +from clustering.permissions import PERMISSION_NODES_VIEW + +from .permissions import PERMISSION_JOB_QUEUE_VIEW + + +node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) + +tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) diff --git a/apps/job_processor/literals.py b/apps/job_processor/literals.py new file mode 100644 index 0000000000..a8a1bdf7aa --- /dev/null +++ b/apps/job_processor/literals.py @@ -0,0 +1,19 @@ +from django.utils.translation import ugettext_lazy as _ + +WORKER_STATE_RUNNING = 'r' +WORKER_STATE_DEAD = 'd' + +WORKER_STATE_CHOICES = ( + (WORKER_STATE_RUNNING, _(u'running')), + (WORKER_STATE_DEAD, _(u'dead')), +) + +JOB_STATE_PENDING = 'p' +JOB_STATE_PROCESSING = 'r' +JOB_STATE_ERROR = 'e' + +JOB_STATE_CHOICES = ( + (JOB_STATE_PENDING, _(u'pending')), + (JOB_STATE_PROCESSING, _(u'processing')), + (JOB_STATE_ERROR, _(u'error')), +) diff --git a/apps/job_processor/migrations/0001_initial.py b/apps/job_processor/migrations/0001_initial.py new file mode 100644 index 0000000000..771b740999 --- /dev/null +++ b/apps/job_processor/migrations/0001_initial.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'Node' + db.create_table('job_processor_node', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)), + ('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)), + ('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)), + )) + db.send_create_signal('job_processor', ['Node']) + + # Adding model 'JobQueue' + db.create_table('job_processor_jobqueue', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=32)), + ('unique_jobs', self.gf('django.db.models.fields.BooleanField')(default=True)), + )) + db.send_create_signal('job_processor', ['JobQueue']) + + # Adding model 'JobQueueItem' + db.create_table('job_processor_jobqueueitem', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('job_queue', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.JobQueue'])), + ('creation_datetime', self.gf('django.db.models.fields.DateTimeField')()), + ('unique_id', self.gf('django.db.models.fields.CharField')(unique=True, max_length=64, blank=True)), + ('job_type', self.gf('django.db.models.fields.CharField')(max_length=32)), + ('kwargs', self.gf('django.db.models.fields.TextField')()), + ('state', self.gf('django.db.models.fields.CharField')(default='p', max_length=4)), + ('result', self.gf('django.db.models.fields.TextField')(blank=True)), + )) + db.send_create_signal('job_processor', ['JobQueueItem']) + + # Adding model 'Worker' + db.create_table('job_processor_worker', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('node', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.Node'])), + ('name', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('creation_datetime', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0))), + ('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)), + ('state', self.gf('django.db.models.fields.CharField')(default='r', max_length=4)), + )) + db.send_create_signal('job_processor', ['Worker']) + + + def backwards(self, orm): + # Deleting model 'Node' + db.delete_table('job_processor_node') + + # Deleting model 'JobQueue' + db.delete_table('job_processor_jobqueue') + + # Deleting model 'JobQueueItem' + db.delete_table('job_processor_jobqueueitem') + + # Deleting model 'Worker' + db.delete_table('job_processor_worker') + + + models = { + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.Node']"}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/migrations/0002_auto__del_node__chg_field_worker_node.py b/apps/job_processor/migrations/0002_auto__del_node__chg_field_worker_node.py new file mode 100644 index 0000000000..6edc8dcbc3 --- /dev/null +++ b/apps/job_processor/migrations/0002_auto__del_node__chg_field_worker_node.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting model 'Node' + db.delete_table('job_processor_node') + + + # Changing field 'Worker.node' + db.alter_column('job_processor_worker', 'node_id', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['clustering.Node'])) + + def backwards(self, orm): + # Adding model 'Node' + db.create_table('job_processor_node', ( + ('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)), + ('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)), + ('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)), + ('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + )) + db.send_create_signal('job_processor', ['Node']) + + + # Changing field 'Worker.node' + db.alter_column('job_processor_worker', 'node_id', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.Node'])) + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py b/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py new file mode 100644 index 0000000000..31469b6b5b --- /dev/null +++ b/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'Worker.job_queue_item' + db.add_column('job_processor_worker', 'job_queue_item', + self.gf('django.db.models.fields.related.ForeignKey')(default=1, to=orm['job_processor.JobQueueItem']), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'Worker.job_queue_item' + db.delete_column('job_processor_worker', 'job_queue_item_id') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py b/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py new file mode 100644 index 0000000000..77c4260b45 --- /dev/null +++ b/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting field 'Worker.name' + db.delete_column('job_processor_worker', 'name') + + # Adding field 'Worker.pid' + db.add_column('job_processor_worker', 'pid', + self.gf('django.db.models.fields.PositiveIntegerField')(default=1, max_length=255), + keep_default=False) + + + def backwards(self, orm): + + # User chose to not deal with backwards NULL issues for 'Worker.name' + raise RuntimeError("Cannot reverse this migration. 'Worker.name' and its values cannot be restored.") + # Deleting field 'Worker.pid' + db.delete_column('job_processor_worker', 'pid') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/migrations/__init__.py b/apps/job_processor/migrations/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 71a8362390..ed10c670bb 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -1,3 +1,241 @@ -from django.db import models +from __future__ import absolute_import -# Create your models here. +import os +import datetime +import uuid +import hashlib +import platform +from multiprocessing import Process + +import psutil + +from django.db import models, IntegrityError, transaction +from django.db import close_connection +from django.contrib.contenttypes import generic +from django.utils.translation import ugettext_lazy as _ +from django.utils.translation import ugettext +from django.utils.simplejson import loads, dumps + +from common.models import Singleton +from clustering.models import Node + +from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, + JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES, + WORKER_STATE_RUNNING) +from .exceptions import JobQueuePushError, JobQueueNoPendingJobs +#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled) + +job_queue_labels = {} +job_types_registry = {} + + +class Job(object): + def __init__(self, function, job_queue_item): + close_connection() + # Run sync or launch async subprocess + # OR launch 2 processes: monitor & actual process + node = Node.objects.myself() + worker = Worker.objects.create(node=node, pid=os.getpid(), job_queue_item=job_queue_item) + try: + close_connection() + transaction.commit_on_success(function)(**loads(job_queue_item.kwargs)) + #function(**loads(job_queue_item.kwargs)) + except Exception, exc: + close_connection() + transaction.rollback() + close_connection() + def set_state_error(): + job_queue_item.result = exc + job_queue_item.state = JOB_STATE_ERROR + job_queue_item.save() + transaction.commit_on_success(set_state_error)() + else: + job_queue_item.delete() + finally: + worker.delete() + + +class JobType(object): + def __init__(self, name, label, function): + self.name = name + self.label = label + self.function = function + job_types_registry[self.name] = self + + def __unicode__(self): + return unicode(self.label) + + def run(self, job_queue_item, **kwargs): + job_queue_item.state = JOB_STATE_PROCESSING + job_queue_item.save() + p = Process(target=Job, args=(self.function, job_queue_item,)) + p.start() + + +class JobQueueManager(models.Manager): + def get_or_create(self, *args, **kwargs): + job_queue_labels[kwargs.get('name')] = kwargs.get('defaults', {}).get('label') + return super(JobQueueManager, self).get_or_create(*args, **kwargs) + + +class JobQueue(models.Model): + # TODO: support for stopping and starting job queues + # Internal name + name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True) + unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True) + + objects = JobQueueManager() + + def __unicode__(self): + return unicode(self.label) or self.names + + @property + def label(self): + return job_queue_labels.get(self.name) + + def push(self, job_type, **kwargs): # TODO: add replace flag + job_queue_item = JobQueueItem(job_queue=self, job_type=job_type.name, kwargs=dumps(kwargs)) + job_queue_item.save() + return job_queue_item + + #def pull(self): + # queue_item_qs = JobQueueItem.objects.filter(queue=self).order_by('-creation_datetime') + # if queue_item_qs: + # queue_item = queue_item_qs[0] + # queue_item.delete() + # return loads(queue_item.data) + + def get_oldest_pending_job(self): + try: + return self.pending_jobs.all().order_by('-creation_datetime')[0] + except IndexError: + raise JobQueueNoPendingJobs + + @property + def pending_jobs(self): + return self.items.filter(state=JOB_STATE_PENDING) + + @property + def error_jobs(self): + return self.items.filter(state=JOB_STATE_ERROR) + + @property + def active_jobs(self): + return self.items.filter(state=JOB_STATE_PROCESSING) + + @property + def items(self): + return self.jobqueueitem_set + + def empty(self): + self.items.all().delete() + + def save(self, *args, **kwargs): + label = getattr(self, 'label', None) + if label: + job_queue_labels[self.name] = label + return super(JobQueue, self).save(*args, **kwargs) + + # TODO: custom runtime methods + + class Meta: + verbose_name = _(u'job queue') + verbose_name_plural = _(u'job queues') + + +class JobQueueItem(models.Model): + # TODO: add re-queue + job_queue = models.ForeignKey(JobQueue, verbose_name=_(u'job queue')) + creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), editable=False) + unique_id = models.CharField(blank=True, max_length=64, verbose_name=_(u'id'), unique=True, editable=False) + job_type = models.CharField(max_length=32, verbose_name=_(u'job type')) + kwargs = models.TextField(verbose_name=_(u'keyword arguments')) + state = models.CharField(max_length=4, + choices=JOB_STATE_CHOICES, + default=JOB_STATE_PENDING, + verbose_name=_(u'state')) + result = models.TextField(blank=True, verbose_name=_(u'result')) + + def __unicode__(self): + return self.unique_id + + def save(self, *args, **kwargs): + self.creation_datetime = datetime.datetime.now() + + if self.job_queue.unique_jobs: + self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() + else: + self.unique_id = unicode(uuid.uuid4()) + try: + super(JobQueueItem, self).save(*args, **kwargs) + except IntegrityError: + # TODO: Maybe replace instead of rasining exception w/ replace flag + raise JobQueuePushError + + def get_job_type(self): + return job_types_registry.get(self.job_type) + + def run(self): + job_type_instance = self.get_job_type() + job_type_instance.run(self) + + @property + def worker(self): + try: + return self.worker_set.get() + except Worker.DoesNotExist: + return None + + class Meta: + ordering = ('creation_datetime',) + verbose_name = _(u'job queue item') + verbose_name_plural = _(u'job queue items') + + +class Worker(models.Model): + node = models.ForeignKey(Node, verbose_name=_(u'node')) + pid = models.PositiveIntegerField(max_length=255, verbose_name=_(u'name')) + creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False) + heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check')) + state = models.CharField(max_length=4, + choices=WORKER_STATE_CHOICES, + default=WORKER_STATE_RUNNING, + verbose_name=_(u'state')) + job_queue_item = models.ForeignKey(JobQueueItem, verbose_name=_(u'job queue item')) + + def __unicode__(self): + return u'%s-%s' % (self.node.hostname, self.pid) + + #def disable(self): + # if self.state == WORKER_STATE_DISABLED: + # raise WorkerAlreadyDisabled + # + # self.state = WORKER_STATE_DISABLED + # self.save() + # + #def enable(self): + # if self.state == WORKER_STATE_ENABLED: + # raise WorkerAlreadyEnabled + # + # self.state = WORKER_STATE_ENABLED + # self.save() + # + #def is_enabled(self): + # return self.state == WORKER_STATE_ENABLED + + class Meta: + ordering = ('creation_datetime',) + verbose_name = _(u'worker') + verbose_name_plural = _(u'workers') + +""" +class JobProcessingConfig(Singleton): + worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead + worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') + + def __unicode__(self): + return ugettext('Workers configuration') + + class Meta: + verbose_name = verbose_name_plural = _(u'Workers configuration') +""" diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py new file mode 100644 index 0000000000..74c06b609a --- /dev/null +++ b/apps/job_processor/permissions.py @@ -0,0 +1,8 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ + +from permissions.models import PermissionNamespace, Permission + +namespace = PermissionNamespace('job_processor', _(u'Job processor')) +PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) diff --git a/apps/job_processor/static/images/icons/hourglass.png b/apps/job_processor/static/images/icons/hourglass.png new file mode 100755 index 0000000000..cd14446b9f Binary files /dev/null and b/apps/job_processor/static/images/icons/hourglass.png differ diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py new file mode 100644 index 0000000000..fac91c03d3 --- /dev/null +++ b/apps/job_processor/tasks.py @@ -0,0 +1,43 @@ +from __future__ import absolute_import + +import logging + +from lock_manager import Lock, LockError +from clustering.models import Node + +from .models import JobQueue +from .exceptions import JobQueueNoPendingJobs + +LOCK_EXPIRE = 10 +MAX_CPU_LOAD = 90.0 +MAX_MEMORY_USAGE = 90.0 + +logger = logging.getLogger(__name__) + + +def job_queue_poll(): + logger.debug('starting') + + node = Node.objects.myself() + if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE: + # Poll job queues if node is not overloaded + lock_id = u'job_queue_poll' + try: + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + except LockError: + pass + except Exception: + lock.release() + raise + else: + for job_queue in JobQueue.objects.all(): + try: + job_item = job_queue.get_oldest_pending_job() + job_item.run() + except JobQueueNoPendingJobs: + logger.debug('no pending jobs for job queue: %s' % job_queue) + lock.release() + else: + logger.debug('CPU load or memory usage over limit') + + diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py new file mode 100644 index 0000000000..d932d04fdc --- /dev/null +++ b/apps/job_processor/urls.py @@ -0,0 +1,10 @@ +from django.conf.urls.defaults import patterns, url + + +urlpatterns = patterns('job_processor.views', + url(r'^node/(?P\d+)/workers/$', 'node_workers', (), 'node_workers'), + url(r'^queue/list/$', 'job_queues', (), 'job_queues'), + url(r'^queue/(?P\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'), + url(r'^queue/(?P\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'), + url(r'^queue/(?P\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), +) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 60f00ef0ef..101697d46a 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -1 +1,153 @@ -# Create your views here. +from __future__ import absolute_import + +from django.shortcuts import render_to_response +from django.template import RequestContext +from django.utils.translation import ugettext_lazy as _ +from django.shortcuts import get_object_or_404 +from django.contrib.contenttypes.models import ContentType +from django.db.models.loading import get_model +from django.http import Http404 +from django.core.exceptions import PermissionDenied +from permissions.models import Permission +from common.utils import encapsulate +from acls.models import AccessEntry +from clustering.permissions import PERMISSION_NODES_VIEW +from clustering.models import Node + +from .models import JobQueue +from .permissions import PERMISSION_JOB_QUEUE_VIEW + + +def node_workers(request, node_pk): + node = get_object_or_404(Node, pk=node_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_NODES_VIEW, request.user, node) + + context = { + 'object_list': node.workers().all(), + 'title': _(u'workers for node: %s') % node, + 'object': node, + 'hide_link': True, + 'extra_columns': [ + { + 'name': _(u'created'), + 'attribute': 'creation_datetime', + }, + { + 'name': _(u'heartbeat'), + 'attribute': 'heartbeat', + }, + { + 'name': _(u'state'), + 'attribute': 'get_state_display', + }, + { + 'name': _(u'job queue item'), + 'attribute': 'job_queue_item', + }, + { + 'name': _(u'job type'), + 'attribute': 'job_queue_item.get_job_type', + }, + { + 'name': _(u'job queue'), + 'attribute': 'job_queue_item.job_queue', + }, + ], + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + +def job_queues(request): + # TODO: permissiong list filtering + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW]) + + context = { + 'object_list': JobQueue.objects.all(), + 'title': _(u'job queue'), + 'hide_link': True, + 'extra_columns': [ + { + 'name': _(u'pending jobs'), + 'attribute': 'pending_jobs.count', + }, + { + 'name': _(u'active jobs'), + 'attribute': 'active_jobs.count', + }, + { + 'name': _(u'error jobs'), + 'attribute': 'error_jobs.count', + }, + ], + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + +def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False, active_jobs=False): + job_queue = get_object_or_404(JobQueue, pk=job_queue_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_VIEW, request.user, job_queue) + + jobs = set() + if pending_jobs: + jobs = job_queue.pending_jobs.all() + title = _(u'pending jobs for queue: %s') % job_queue + + if error_jobs: + jobs = job_queue.error_jobs.all() + title = _(u'error jobs for queue: %s') % job_queue + + if active_jobs: + jobs = job_queue.active_jobs.all() + title = _(u'active jobs for queue: %s') % job_queue + + context = { + 'object_list': jobs, + 'title': title, + 'object': job_queue, + 'hide_link': True, + 'extra_columns': [ + { + 'name': _(u'created'), + 'attribute': 'creation_datetime', + }, + { + 'name': _(u'job type'), + 'attribute': 'get_job_type', + }, + { + 'name': _(u'arguments'), + 'attribute': 'kwargs', + }, + ], + } + + if active_jobs: + context['extra_columns'].append( + { + 'name': _(u'worker'), + 'attribute': encapsulate(lambda x: x.worker or _(u'Unknown')), + } + ) + + if error_jobs: + context['extra_columns'].append( + { + 'name': _(u'result'), + 'attribute': 'result', + } + ) + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) diff --git a/apps/lock_manager/__init__.py b/apps/lock_manager/__init__.py index 61fe7fbf78..886e3fb0f8 100644 --- a/apps/lock_manager/__init__.py +++ b/apps/lock_manager/__init__.py @@ -1,6 +1,10 @@ from __future__ import absolute_import +import logging + from .exceptions import LockError from .models import Lock as LockModel +logger = logging.getLogger(__name__) + Lock = LockModel.objects diff --git a/apps/lock_manager/decorators.py b/apps/lock_manager/decorators.py new file mode 100644 index 0000000000..c98a7ae1c9 --- /dev/null +++ b/apps/lock_manager/decorators.py @@ -0,0 +1,33 @@ +from __future__ import absolute_import + +from functools import wraps + +from . import logger +from . import Lock +from .exceptions import LockError + + +def simple_locking(lock_id, expiration=None): + """ + A decorator that wraps a function in a single lock getting algorithm + """ + def inner_decorator(function): + def wrapper(*args, **kwargs): + try: + # Trying to acquire lock + lock = Lock.acquire_lock(lock_id, expiration) + except LockError: + # Unable to acquire lock + pass + except Exception: + # Unhandled error, release lock + lock.release() + raise + else: + # Lock acquired, proceed normally, release lock afterwards + logger.debug('acquired lock: %s' % lock_id) + result = function(*args, **kwargs) + lock.release() + return result + return wraps(function)(wrapper) + return inner_decorator diff --git a/apps/lock_manager/managers.py b/apps/lock_manager/managers.py index 99145e8c5a..4096c5e85b 100644 --- a/apps/lock_manager/managers.py +++ b/apps/lock_manager/managers.py @@ -31,7 +31,7 @@ class LockManager(models.Manager): except self.model.DoesNotExist: # Table based locking logger.debug('lock: %s does not exist' % name) - raise LockError('Unable to acquire lock') + raise LockError('unable to acquire lock: %s' % name) if datetime.datetime.now() > lock.creation_datetime + datetime.timedelta(seconds=lock.timeout): logger.debug('reseting deleting stale lock: %s' % name) diff --git a/apps/lock_manager/models.py b/apps/lock_manager/models.py index a70af8230c..d7d47815bd 100644 --- a/apps/lock_manager/models.py +++ b/apps/lock_manager/models.py @@ -2,7 +2,8 @@ from __future__ import absolute_import import datetime -from django.db import models +from django.db import close_connection +from django.db import (models, transaction, DatabaseError) from django.utils.translation import ugettext_lazy as _ from .managers import LockManager @@ -26,13 +27,17 @@ class Lock(models.Model): super(Lock, self).save(*args, **kwargs) + @transaction.commit_on_success def release(self): + close_connection() try: lock = Lock.objects.get(name=self.name, creation_datetime=self.creation_datetime) lock.delete() except Lock.DoesNotExist: - # Out lock expired and was reassigned + # Lock expired and was reassigned pass + except DatabaseError: + transaction.rollback() class Meta: verbose_name = _(u'lock') diff --git a/apps/navigation/api.py b/apps/navigation/api.py index e21f6b1050..42c7aa16ec 100644 --- a/apps/navigation/api.py +++ b/apps/navigation/api.py @@ -158,7 +158,11 @@ def bind_links(sources, links, menu_name=None, position=0): bound_links.setdefault(menu_name, {}) for source in sources: bound_links[menu_name].setdefault(source, {'links': []}) - bound_links[menu_name][source]['links'].extend(links) + try: + bound_links[menu_name][source]['links'].extend(links) + except TypeError: + # Try to see if links is a single link + bound_links[menu_name][source]['links'].append(links) def register_top_menu(name, link, position=None): diff --git a/apps/ocr/__init__.py b/apps/ocr/__init__.py index 95ba9a023f..b5447d8fa7 100644 --- a/apps/ocr/__init__.py +++ b/apps/ocr/__init__.py @@ -15,49 +15,42 @@ from documents.models import Document, DocumentVersion from maintenance.api import register_maintenance_links from project_tools.api import register_tool from acls.api import class_permissions -from scheduler.api import register_interval_job from statistics.api import register_statistics +from job_processor.models import JobQueue, JobType +from job_processor.exceptions import JobQueuePushError from .conf.settings import (AUTOMATIC_OCR, QUEUE_PROCESSING_INTERVAL) -from .models import DocumentQueue, QueueTransformation -from .tasks import task_process_document_queues +from .models import OCRProcessingSingleton +from .api import do_document_ocr from .permissions import PERMISSION_OCR_DOCUMENT from .exceptions import AlreadyQueued from . import models as ocr_models from .statistics import get_statistics +from .literals import OCR_QUEUE_NAME logger = logging.getLogger(__name__) +ocr_job_queue = None from .links import (submit_document, re_queue_multiple_document, - queue_document_multiple_delete, document_queue_disable, - document_queue_enable, all_document_ocr_cleanup, queue_document_list, - ocr_tool_link, setup_queue_transformation_list, - setup_queue_transformation_create, setup_queue_transformation_edit, - setup_queue_transformation_delete, submit_document_multiple) + queue_document_multiple_delete, ocr_disable, + ocr_enable, all_document_ocr_cleanup, ocr_log, + ocr_tool_link, submit_document_multiple) bind_links([Document], [submit_document]) -bind_links([DocumentQueue], [document_queue_disable, document_queue_enable, setup_queue_transformation_list]) -bind_links([QueueTransformation], [setup_queue_transformation_edit, setup_queue_transformation_delete]) - -register_multi_item_links(['queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete]) - -bind_links(['setup_queue_transformation_create', 'setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'document_queue_disable', 'document_queue_enable', 'queue_document_list', 'setup_queue_transformation_list'], [queue_document_list], menu_name='secondary_menu') -bind_links(['setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'setup_queue_transformation_list', 'setup_queue_transformation_create'], [setup_queue_transformation_create], menu_name='sidebar') +bind_links([OCRProcessingSingleton], [ocr_disable, ocr_enable]) +#register_multi_item_links(['queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete]) register_maintenance_links([all_document_ocr_cleanup], namespace='ocr', title=_(u'OCR')) register_multi_item_links(['folder_view', 'search', 'results', 'index_instance_node_view', 'document_find_duplicates', 'document_type_document_list', 'document_group_view', 'document_list', 'document_list_recent'], [submit_document_multiple]) @transaction.commit_on_success -def create_default_queue(): +def create_ocr_job_queue(): + global ocr_job_queue try: - default_queue, created = DocumentQueue.objects.get_or_create(name='default') + ocr_job_queue, created = JobQueue.objects.get_or_create(name=OCR_QUEUE_NAME, defaults={'label': _('OCR'), 'unique_jobs': True}) except DatabaseError: transaction.rollback() - else: - if created: - default_queue.label = ugettext(u'Default') - default_queue.save() @receiver(post_save, dispatch_uid='document_post_save', sender=DocumentVersion) @@ -67,8 +60,8 @@ def document_post_save(sender, instance, **kwargs): if kwargs.get('created', False): if AUTOMATIC_OCR: try: - DocumentQueue.objects.queue_document(instance.document) - except AlreadyQueued: + instance.submit_for_ocr() + except JobQueuePushError: pass # Disabled because it appears Django execute signals using the same @@ -80,17 +73,15 @@ def document_post_save(sender, instance, **kwargs): # logger.debug('got call_queue signal: %s' % kwargs) # task_process_document_queues() - -@receiver(post_syncdb, dispatch_uid='create_default_queue', sender=ocr_models) -def create_default_queue_signal_handler(sender, **kwargs): - create_default_queue() - -register_interval_job('task_process_document_queues', _(u'Checks the OCR queue for pending documents.'), task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL) - register_tool(ocr_tool_link) class_permissions(Document, [ PERMISSION_OCR_DOCUMENT, ]) -register_statistics(get_statistics) +#register_statistics(get_statistics) +create_ocr_job_queue() +ocr_job_type = JobType('ocr', _(u'OCR'), do_document_ocr) + +Document.add_to_class('submit_for_ocr', lambda document: ocr_job_queue.push(ocr_job_type, document_version_pk=document.latest_version.pk)) +DocumentVersion.add_to_class('submit_for_ocr', lambda document_version: ocr_job_queue.push(ocr_job_type, document_version_pk=document_version.pk)) diff --git a/apps/ocr/admin.py b/apps/ocr/admin.py deleted file mode 100644 index 0210faf751..0000000000 --- a/apps/ocr/admin.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import absolute_import - -from django.contrib import admin - -from .models import DocumentQueue, QueueDocument - - -class QueueDocumentInline(admin.StackedInline): - model = QueueDocument - extra = 1 - classes = ('collapse-open',) - allow_add = True - - -class DocumentQueueAdmin(admin.ModelAdmin): - inlines = [QueueDocumentInline] - list_display = ('name', 'label', 'state') - - -admin.site.register(DocumentQueue, DocumentQueueAdmin) diff --git a/apps/ocr/api.py b/apps/ocr/api.py index 33450b0862..5af659a4b3 100644 --- a/apps/ocr/api.py +++ b/apps/ocr/api.py @@ -12,7 +12,7 @@ from django.utils.importlib import import_module from common.conf.settings import TEMPORARY_DIRECTORY from converter.api import convert -from documents.models import DocumentPage +from documents.models import DocumentPage, DocumentVersion from .conf.settings import (TESSERACT_PATH, TESSERACT_LANGUAGE, UNPAPER_PATH) from .exceptions import TesseractError, UnpaperError @@ -81,25 +81,25 @@ def run_tesseract(input_filename, lang=None): return text -def do_document_ocr(queue_document): +def do_document_ocr(document_version_pk): """ Try first to extract text from document pages using the registered parser, if the parser fails or if there is no parser registered for the document mimetype do a visual OCR by calling tesseract """ - for document_page in queue_document.document.pages.all(): + document_version = DocumentVersion.objects.get(pk=document_version_pk) + for document_page in document_version.pages.all(): try: # Try to extract text by means of a parser parse_document_page(document_page) except (ParserError, ParserUnknownFile): # Fall back to doing visual OCR - ocr_transformations, warnings = queue_document.get_transformation_list() document_filepath = document_page.document.get_image_cache_name(page=document_page.page_number, version=document_page.document_version.pk) unpaper_output_filename = u'%s_unpaper_out_page_%s%s%s' % (document_page.document.uuid, document_page.page_number, os.extsep, UNPAPER_FILE_FORMAT) unpaper_output_filepath = os.path.join(TEMPORARY_DIRECTORY, unpaper_output_filename) - unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT, transformations=ocr_transformations) + unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT) execute_unpaper(input_filepath=unpaper_input, output_filepath=unpaper_output_filepath) #from PIL import Image, ImageOps diff --git a/apps/ocr/exceptions.py b/apps/ocr/exceptions.py index 32ec4c4c07..27d72374b9 100644 --- a/apps/ocr/exceptions.py +++ b/apps/ocr/exceptions.py @@ -21,3 +21,11 @@ class UnpaperError(Exception): class ReQueueError(Exception): pass + + +class OCRProcessingAlreadyDisabled(Exception): + pass + + +class OCRProcessingAlreadyEnabled(Exception): + pass diff --git a/apps/ocr/forms.py b/apps/ocr/forms.py deleted file mode 100644 index 19e8ea6805..0000000000 --- a/apps/ocr/forms.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import absolute_import - -from django import forms - -from .models import QueueTransformation - - -class QueueTransformationForm(forms.ModelForm): - class Meta: - model = QueueTransformation - - def __init__(self, *args, **kwargs): - super(QueueTransformationForm, self).__init__(*args, **kwargs) - self.fields['content_type'].widget = forms.HiddenInput() - self.fields['object_id'].widget = forms.HiddenInput() - - -class QueueTransformationForm_create(forms.ModelForm): - class Meta: - model = QueueTransformation - exclude = ('content_type', 'object_id') diff --git a/apps/ocr/links.py b/apps/ocr/links.py index 3baf133699..b3d34171f7 100644 --- a/apps/ocr/links.py +++ b/apps/ocr/links.py @@ -7,23 +7,26 @@ from navigation.api import Link from .permissions import (PERMISSION_OCR_DOCUMENT, PERMISSION_OCR_DOCUMENT_DELETE, PERMISSION_OCR_QUEUE_ENABLE_DISABLE, PERMISSION_OCR_CLEAN_ALL_PAGES) +from .models import OCRProcessingSingleton -submit_document = Link(text=_('submit to OCR queue'), view='submit_document', args='object.id', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT]) -submit_document_multiple = Link(text=_('submit to OCR queue'), view='submit_document_multiple', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT]) +def is_enabled(context): + return OCRProcessingSingleton.get().is_enabled() + +def is_disabled(context): + return not OCRProcessingSingleton.get().is_enabled() + + +ocr_log = Link(text=_(u'queue document list'), view='ocr_log', sprite='text', permissions=[PERMISSION_OCR_DOCUMENT]) +ocr_disable = Link(text=_(u'disable OCR processing'), view='ocr_disable', sprite='control_stop_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE], conditional_disable=is_disabled) +ocr_enable = Link(text=_(u'enable OCR processing'), view='ocr_enable', sprite='control_play_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE], conditional_disable=is_enabled) +submit_document = Link(text=_('submit to OCR queue'), view='submit_document', args='object.id', sprite='text_dropcaps', permissions=[PERMISSION_OCR_DOCUMENT]) +submit_document_multiple = Link(text=_('submit to OCR queue'), view='submit_document_multiple', sprite='text_dropcaps', permissions=[PERMISSION_OCR_DOCUMENT]) re_queue_document = Link(text=_('re-queue'), view='re_queue_document', args='object.id', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT]) re_queue_multiple_document = Link(text=_('re-queue'), view='re_queue_multiple_document', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT]) queue_document_delete = Link(text=_(u'delete'), view='queue_document_delete', args='object.id', sprite='hourglass_delete', permissions=[PERMISSION_OCR_DOCUMENT_DELETE]) queue_document_multiple_delete = Link(text=_(u'delete'), view='queue_document_multiple_delete', sprite='hourglass_delete', permissions=[PERMISSION_OCR_DOCUMENT_DELETE]) -document_queue_disable = Link(text=_(u'stop queue'), view='document_queue_disable', args='queue.id', sprite='control_stop_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) -document_queue_enable = Link(text=_(u'activate queue'), view='document_queue_enable', args='queue.id', sprite='control_play_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) all_document_ocr_cleanup = Link(text=_(u'clean up pages content'), view='all_document_ocr_cleanup', sprite='text_strikethrough', permissions=[PERMISSION_OCR_CLEAN_ALL_PAGES], description=_(u'Runs a language filter to remove common OCR mistakes from document pages content.')) -queue_document_list = Link(text=_(u'queue document list'), view='queue_document_list', sprite='hourglass', permissions=[PERMISSION_OCR_DOCUMENT]) -ocr_tool_link = Link(text=_(u'OCR'), view='queue_document_list', sprite='hourglass', icon='text.png', permissions=[PERMISSION_OCR_DOCUMENT], children_view_regex=[r'queue_', r'document_queue']) - -setup_queue_transformation_list = Link(text=_(u'transformations'), view='setup_queue_transformation_list', args='queue.pk', sprite='shape_move_front') -setup_queue_transformation_create = Link(text=_(u'add transformation'), view='setup_queue_transformation_create', args='queue.pk', sprite='shape_square_add') -setup_queue_transformation_edit = Link(text=_(u'edit'), view='setup_queue_transformation_edit', args='transformation.pk', sprite='shape_square_edit') -setup_queue_transformation_delete = Link(text=_(u'delete'), view='setup_queue_transformation_delete', args='transformation.pk', sprite='shape_square_delete') +ocr_tool_link = Link(text=_(u'OCR'), view='ocr_log', sprite='hourglass', icon='text.png', permissions=[PERMISSION_OCR_DOCUMENT]) # children_view_regex=[r'queue_', r'document_queue']) diff --git a/apps/ocr/literals.py b/apps/ocr/literals.py index 946c063e38..b7d10f8615 100644 --- a/apps/ocr/literals.py +++ b/apps/ocr/literals.py @@ -1,25 +1,16 @@ from django.utils.translation import ugettext_lazy as _ -DOCUMENTQUEUE_STATE_STOPPED = 's' -DOCUMENTQUEUE_STATE_ACTIVE = 'a' +OCR_STATE_DISABLED = 'd' +OCR_STATE_ENABLED = 'e' -DOCUMENTQUEUE_STATE_CHOICES = ( - (DOCUMENTQUEUE_STATE_STOPPED, _(u'stopped')), - (DOCUMENTQUEUE_STATE_ACTIVE, _(u'active')), -) - - -QUEUEDOCUMENT_STATE_PENDING = 'p' -QUEUEDOCUMENT_STATE_PROCESSING = 'i' -QUEUEDOCUMENT_STATE_ERROR = 'e' - -QUEUEDOCUMENT_STATE_CHOICES = ( - (QUEUEDOCUMENT_STATE_PENDING, _(u'pending')), - (QUEUEDOCUMENT_STATE_PROCESSING, _(u'processing')), - (QUEUEDOCUMENT_STATE_ERROR, _(u'error')), +OCR_STATE_CHOICES = ( + (OCR_STATE_DISABLED, _(u'disabled')), + (OCR_STATE_ENABLED, _(u'enabled')), ) DEFAULT_OCR_FILE_FORMAT = u'tiff' DEFAULT_OCR_FILE_EXTENSION = u'tif' UNPAPER_FILE_FORMAT = u'ppm' + +OCR_QUEUE_NAME = 'ocr' diff --git a/apps/ocr/managers.py b/apps/ocr/managers.py index b4596356d6..8e3946e0b5 100644 --- a/apps/ocr/managers.py +++ b/apps/ocr/managers.py @@ -2,19 +2,19 @@ from __future__ import absolute_import from django.db import models -from .exceptions import AlreadyQueued +#from .exceptions import AlreadyQueued -class DocumentQueueManager(models.Manager): - ''' - Module manager class to handle adding documents to an OCR document - queue - ''' - def queue_document(self, document, queue_name='default'): - document_queue = self.model.objects.get(name=queue_name) - if document_queue.queuedocument_set.filter(document=document): - raise AlreadyQueued +class OCRProcessingManager(models.Manager): + """ + Module manager class to handle adding documents to an OCR queue + """ + def queue_document(self, document): + pass + #document_queue = self.model.objects.get(name=queue_name) + #if document_queue.queuedocument_set.filter(document_version=document.latest_version): + # raise AlreadyQueued - document_queue.queuedocument_set.create(document=document, delay=True) + #document_queue.queuedocument_set.create(document_version=document.latest_version, delay=True) - return document_queue + #return document_queue diff --git a/apps/ocr/migrations/0002_auto__del_documentqueue__del_queuedocument__del_queuetransformation__a.py b/apps/ocr/migrations/0002_auto__del_documentqueue__del_queuedocument__del_queuetransformation__a.py new file mode 100644 index 0000000000..8317488788 --- /dev/null +++ b/apps/ocr/migrations/0002_auto__del_documentqueue__del_queuedocument__del_queuetransformation__a.py @@ -0,0 +1,77 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting model 'DocumentQueue' + db.delete_table('ocr_documentqueue') + + # Deleting model 'QueueDocument' + db.delete_table('ocr_queuedocument') + + # Deleting model 'QueueTransformation' + db.delete_table('ocr_queuetransformation') + + # Adding model 'OCRProcessingSingleton' + db.create_table('ocr_ocrprocessingsingleton', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)), + ('state', self.gf('django.db.models.fields.CharField')(default='a', max_length=4)), + )) + db.send_create_signal('ocr', ['OCRProcessingSingleton']) + + + def backwards(self, orm): + # Adding model 'DocumentQueue' + db.create_table('ocr_documentqueue', ( + ('state', self.gf('django.db.models.fields.CharField')(default='a', max_length=4)), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('name', self.gf('django.db.models.fields.CharField')(max_length=64, unique=True)), + ('label', self.gf('django.db.models.fields.CharField')(max_length=64)), + )) + db.send_create_signal('ocr', ['DocumentQueue']) + + # Adding model 'QueueDocument' + db.create_table('ocr_queuedocument', ( + ('delay', self.gf('django.db.models.fields.BooleanField')(default=False)), + ('state', self.gf('django.db.models.fields.CharField')(default='p', max_length=4)), + ('result', self.gf('django.db.models.fields.TextField')(null=True, blank=True)), + ('datetime_submitted', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True, db_index=True)), + ('document_queue', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['ocr.DocumentQueue'])), + ('document_version', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['documents.DocumentVersion'])), + ('document', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['documents.Document'])), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('node_name', self.gf('django.db.models.fields.CharField')(max_length=32, null=True, blank=True)), + )) + db.send_create_signal('ocr', ['QueueDocument']) + + # Adding model 'QueueTransformation' + db.create_table('ocr_queuetransformation', ( + ('object_id', self.gf('django.db.models.fields.PositiveIntegerField')()), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('arguments', self.gf('django.db.models.fields.TextField')(null=True, blank=True)), + ('content_type', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['contenttypes.ContentType'])), + ('order', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, null=True, blank=True, db_index=True)), + ('transformation', self.gf('django.db.models.fields.CharField')(max_length=128)), + )) + db.send_create_signal('ocr', ['QueueTransformation']) + + # Deleting model 'OCRProcessingSingleton' + db.delete_table('ocr_ocrprocessingsingleton') + + + models = { + 'ocr.ocrprocessingsingleton': { + 'Meta': {'object_name': 'OCRProcessingSingleton'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'a'", 'max_length': '4'}) + } + } + + complete_apps = ['ocr'] \ No newline at end of file diff --git a/apps/ocr/models.py b/apps/ocr/models.py index 8a77d12928..9898db7060 100644 --- a/apps/ocr/models.py +++ b/apps/ocr/models.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from ast import literal_eval -from datetime import datetime +import datetime from django.db import models from django.utils.translation import ugettext_lazy as _ @@ -11,113 +11,45 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes import generic from django.core.exceptions import ValidationError -from documents.models import Document +from common.models import Singleton +from documents.models import Document, DocumentVersion from converter.api import get_available_transformations_choices from sources.managers import SourceTransformationManager -from .literals import (DOCUMENTQUEUE_STATE_CHOICES, - QUEUEDOCUMENT_STATE_PENDING, QUEUEDOCUMENT_STATE_CHOICES, - QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE) -from .managers import DocumentQueueManager -from .exceptions import ReQueueError +from .literals import (OCR_STATE_CHOICES, OCR_STATE_ENABLED, + OCR_STATE_DISABLED) +from .managers import OCRProcessingManager +from .exceptions import (ReQueueError, OCRProcessingAlreadyDisabled, + OCRProcessingAlreadyEnabled) -class DocumentQueue(models.Model): - name = models.CharField(max_length=64, unique=True, verbose_name=_(u'name')) - label = models.CharField(max_length=64, verbose_name=_(u'label')) +class OCRProcessingSingleton(Singleton): state = models.CharField(max_length=4, - choices=DOCUMENTQUEUE_STATE_CHOICES, - default=DOCUMENTQUEUE_STATE_ACTIVE, + choices=OCR_STATE_CHOICES, + default=OCR_STATE_ENABLED, verbose_name=_(u'state')) - objects = DocumentQueueManager() - - class Meta: - verbose_name = _(u'document queue') - verbose_name_plural = _(u'document queues') + #objects = AnonymousUserSingletonManager() def __unicode__(self): - return self.label + return ugettext('OCR processing') + def disable(self): + if self.state == OCR_STATE_DISABLED: + raise OCRProcessingAlreadyDisabled + + self.state = OCR_STATE_DISABLED + self.save() -class QueueDocument(models.Model): - document_queue = models.ForeignKey(DocumentQueue, verbose_name=_(u'document queue')) - document = models.ForeignKey(Document, verbose_name=_(u'document')) - datetime_submitted = models.DateTimeField(verbose_name=_(u'date time submitted'), auto_now_add=True, db_index=True) - delay = models.BooleanField(verbose_name=_(u'delay ocr'), default=False) - state = models.CharField(max_length=4, - choices=QUEUEDOCUMENT_STATE_CHOICES, - default=QUEUEDOCUMENT_STATE_PENDING, - verbose_name=_(u'state')) - result = models.TextField(blank=True, null=True, verbose_name=_(u'result')) - node_name = models.CharField(max_length=32, verbose_name=_(u'node name'), blank=True, null=True) + def enable(self): + if self.state == OCR_STATE_ENABLED: + raise OCRProcessingAlreadyEnabled + + self.state = OCR_STATE_ENABLED + self.save() + + def is_enabled(self): + return self.state == OCR_STATE_ENABLED class Meta: - ordering = ('datetime_submitted',) - verbose_name = _(u'queue document') - verbose_name_plural = _(u'queue documents') - - def get_transformation_list(self): - return QueueTransformation.transformations.get_for_object_as_list(self) - - def requeue(self): - if self.state == QUEUEDOCUMENT_STATE_PROCESSING: - raise ReQueueError - else: - self.datetime_submitted = datetime.now() - self.state = QUEUEDOCUMENT_STATE_PENDING - self.delay = False - self.result = None - self.node_name = None - self.save() - - def __unicode__(self): - try: - return unicode(self.document) - except ObjectDoesNotExist: - return ugettext(u'Missing document.') - - -class ArgumentsValidator(object): - message = _(u'Enter a valid value.') - code = 'invalid' - - def __init__(self, message=None, code=None): - if message is not None: - self.message = message - if code is not None: - self.code = code - - def __call__(self, value): - ''' - Validates that the input evaluates correctly. - ''' - value = value.strip() - try: - literal_eval(value) - except (ValueError, SyntaxError): - raise ValidationError(self.message, code=self.code) - - -class QueueTransformation(models.Model): - ''' - Model that stores the transformation and transformation arguments - for a given document queue - ''' - content_type = models.ForeignKey(ContentType) - object_id = models.PositiveIntegerField() - content_object = generic.GenericForeignKey('content_type', 'object_id') - order = models.PositiveIntegerField(default=0, blank=True, null=True, verbose_name=_(u'order'), db_index=True) - transformation = models.CharField(choices=get_available_transformations_choices(), max_length=128, verbose_name=_(u'transformation')) - arguments = models.TextField(blank=True, null=True, verbose_name=_(u'arguments'), help_text=_(u'Use dictionaries to indentify arguments, example: %s') % u'{\'degrees\':90}', validators=[ArgumentsValidator()]) - - objects = models.Manager() - transformations = SourceTransformationManager() - - def __unicode__(self): - return self.get_transformation_display() - - class Meta: - ordering = ('order',) - verbose_name = _(u'document queue transformation') - verbose_name_plural = _(u'document queue transformations') + verbose_name = verbose_name_plural = _(u'OCR processing properties') diff --git a/apps/ocr/permissions.py b/apps/ocr/permissions.py index f74f1ec267..17f7a5de7a 100644 --- a/apps/ocr/permissions.py +++ b/apps/ocr/permissions.py @@ -7,6 +7,6 @@ from permissions.models import Permission, PermissionNamespace ocr_namespace = PermissionNamespace('ocr', _(u'OCR')) PERMISSION_OCR_DOCUMENT = Permission.objects.register(ocr_namespace, 'ocr_document', _(u'Submit documents for OCR')) PERMISSION_OCR_DOCUMENT_DELETE = Permission.objects.register(ocr_namespace, 'ocr_document_delete', _(u'Delete documents from OCR queue')) -PERMISSION_OCR_QUEUE_ENABLE_DISABLE = Permission.objects.register(ocr_namespace, 'ocr_queue_enable_disable', _(u'Can enable/disable the OCR queue')) +PERMISSION_OCR_QUEUE_ENABLE_DISABLE = Permission.objects.register(ocr_namespace, 'ocr_queue_enable_disable', _(u'Can enable/disable the OCR processing')) PERMISSION_OCR_CLEAN_ALL_PAGES = Permission.objects.register(ocr_namespace, 'ocr_clean_all_pages', _(u'Can execute the OCR clean up on all document pages')) PERMISSION_OCR_QUEUE_EDIT = Permission.objects.register(ocr_namespace, 'ocr_queue_edit', _(u'Can edit an OCR queue properties')) diff --git a/apps/ocr/statistics.py b/apps/ocr/statistics.py index 590075c719..bef113303e 100644 --- a/apps/ocr/statistics.py +++ b/apps/ocr/statistics.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from django.utils.translation import ugettext as _ -from .models import DocumentQueue, QueueDocument +#from .models import DocumentQueue, QueueDocument def get_statistics(): diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py deleted file mode 100644 index 0a0d8ab1e6..0000000000 --- a/apps/ocr/tasks.py +++ /dev/null @@ -1,75 +0,0 @@ -from __future__ import absolute_import - -from datetime import timedelta, datetime -import platform -import logging - -from django.db.models import Q - -from job_processor.api import process_job -from lock_manager import Lock, LockError - -from .api import do_document_ocr -from .literals import (QUEUEDOCUMENT_STATE_PENDING, - QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE, - QUEUEDOCUMENT_STATE_ERROR) -from .models import QueueDocument, DocumentQueue -from .conf.settings import NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY - -LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes -# TODO: Tie LOCK_EXPIRATION with hard task timeout - -logger = logging.getLogger(__name__) - - -def task_process_queue_document(queue_document_id): - lock_id = u'task_proc_queue_doc-%d' % queue_document_id - try: - logger.debug('trying to acquire lock: %s' % lock_id) - lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) - logger.debug('acquired lock: %s' % lock_id) - queue_document = QueueDocument.objects.get(pk=queue_document_id) - queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING - queue_document.node_name = platform.node() - queue_document.save() - try: - do_document_ocr(queue_document) - queue_document.delete() - except Exception, e: - queue_document.state = QUEUEDOCUMENT_STATE_ERROR - queue_document.result = e - queue_document.save() - - lock.release() - except LockError: - logger.debug('unable to obtain lock') - pass - - -def task_process_document_queues(): - logger.debug('executed') - # TODO: reset_orphans() - q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) - q_delayed = Q(delay=True) - q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) - for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): - current_local_processing_count = QueueDocument.objects.filter( - state=QUEUEDOCUMENT_STATE_PROCESSING).filter( - node_name=platform.node()).count() - if current_local_processing_count < NODE_CONCURRENT_EXECUTION: - try: - oldest_queued_document_qs = document_queue.queuedocument_set.filter( - (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - - if oldest_queued_document_qs: - oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - process_job(task_process_queue_document, oldest_queued_document.pk) - except Exception, e: - logger.error('unhandled exception: %s' % e) - finally: - # Don't process anymore from this queryset, might be stale - break - else: - logger.debug('already processing maximun') - else: - logger.debug('nothing to process') diff --git a/apps/ocr/urls.py b/apps/ocr/urls.py index d77be818f8..91b3e2d85f 100644 --- a/apps/ocr/urls.py +++ b/apps/ocr/urls.py @@ -1,21 +1,17 @@ from django.conf.urls.defaults import patterns, url urlpatterns = patterns('ocr.views', + url(r'^log/$', 'ocr_log', (), 'ocr_log'), + + url(r'^processing/enable/$', 'ocr_enable', (), 'ocr_enable'), + url(r'^processing/disable/$', 'ocr_disable', (), 'ocr_disable'), + url(r'^document/(?P\d+)/submit/$', 'submit_document', (), 'submit_document'), url(r'^document/multiple/submit/$', 'submit_document_multiple', (), 'submit_document_multiple'), - url(r'^queue/document/list/$', 'queue_document_list', (), 'queue_document_list'), - url(r'^queue/document/(?P\d+)/delete/$', 'queue_document_delete', (), 'queue_document_delete'), - url(r'^queue/document/multiple/delete/$', 'queue_document_multiple_delete', (), 'queue_document_multiple_delete'), - url(r'^queue/document/(?P\d+)/re-queue/$', 're_queue_document', (), 're_queue_document'), - url(r'^queue/document/multiple/re-queue/$', 're_queue_multiple_document', (), 're_queue_multiple_document'), - - url(r'^queue/(?P\d+)/enable/$', 'document_queue_enable', (), 'document_queue_enable'), - url(r'^queue/(?P\d+)/disable/$', 'document_queue_disable', (), 'document_queue_disable'), + #url(r'^queue/document/(?P\d+)/delete/$', 'queue_document_delete', (), 'queue_document_delete'), + #url(r'^queue/document/multiple/delete/$', 'queue_document_multiple_delete', (), 'queue_document_multiple_delete'), + #url(r'^queue/document/(?P\d+)/re-queue/$', 're_queue_document', (), 're_queue_document'), + #url(r'^queue/document/multiple/re-queue/$', 're_queue_multiple_document', (), 're_queue_multiple_document'), url(r'^document/all/clean_up/$', 'all_document_ocr_cleanup', (), 'all_document_ocr_cleanup'), - - url(r'^queue/(?P\d+)/transformation/list/$', 'setup_queue_transformation_list', (), 'setup_queue_transformation_list'), - url(r'^queue/(?P\w+)/transformation/create/$', 'setup_queue_transformation_create', (), 'setup_queue_transformation_create'), - url(r'^queue/transformation/(?P\w+)/edit/$', 'setup_queue_transformation_edit', (), 'setup_queue_transformation_edit'), - url(r'^queue/transformation/(?P\w+)/delete/$', 'setup_queue_transformation_delete', (), 'setup_queue_transformation_delete'), ) diff --git a/apps/ocr/views.py b/apps/ocr/views.py index 3af18eb59c..ba70b072f6 100644 --- a/apps/ocr/views.py +++ b/apps/ocr/views.py @@ -14,56 +14,114 @@ from documents.models import Document from documents.widgets import document_link, document_thumbnail from common.utils import encapsulate from acls.models import AccessEntry +from job_processor.exceptions import JobQueuePushError from .permissions import (PERMISSION_OCR_DOCUMENT, PERMISSION_OCR_DOCUMENT_DELETE, PERMISSION_OCR_QUEUE_ENABLE_DISABLE, PERMISSION_OCR_CLEAN_ALL_PAGES, PERMISSION_OCR_QUEUE_EDIT) -from .models import DocumentQueue, QueueDocument, QueueTransformation -from .literals import (QUEUEDOCUMENT_STATE_PROCESSING, - DOCUMENTQUEUE_STATE_ACTIVE, DOCUMENTQUEUE_STATE_STOPPED) -from .exceptions import AlreadyQueued, ReQueueError +from .models import OCRProcessingSingleton +from .exceptions import (AlreadyQueued, ReQueueError, OCRProcessingAlreadyDisabled, + OCRProcessingAlreadyEnabled) from .api import clean_pages -from .forms import QueueTransformationForm, QueueTransformationForm_create +from . import ocr_job_queue, ocr_job_type -def queue_document_list(request, queue_name='default'): +def ocr_log(request): Permission.objects.check_permissions(request.user, [PERMISSION_OCR_DOCUMENT]) - document_queue = get_object_or_404(DocumentQueue, name=queue_name) - - return object_list( - request, - queryset=document_queue.queuedocument_set.all(), - template_name='generic_list.html', - extra_context={ - 'title': _(u'documents in queue: %s') % document_queue, - 'hide_object': True, - 'queue': document_queue, - 'object_name': _(u'document queue'), - 'navigation_object_name': 'queue', - 'list_object_variable_name': 'queue_document', - 'extra_columns': [ - {'name': 'document', 'attribute': encapsulate(lambda x: document_link(x.document) if hasattr(x, 'document') else _(u'Missing document.'))}, - {'name': _(u'thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document))}, - {'name': 'submitted', 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True}, - {'name': 'delay', 'attribute': 'delay'}, - {'name': 'state', 'attribute': encapsulate(lambda x: x.get_state_display())}, - {'name': 'node', 'attribute': 'node_name'}, - {'name': 'result', 'attribute': 'result'}, - ], - 'multi_select_as_buttons': True, - 'sidebar_subtemplates_list': [ - { - 'name': 'generic_subtemplate.html', - 'context': { - 'side_bar': True, - 'title': _(u'document queue properties'), - 'content': _(u'Current state: %s') % document_queue.get_state_display(), - } + context = { + 'queue': OCRProcessingSingleton.get(), + 'object_name': _(u'OCR processing'), # TODO fix, not working + 'navigation_object_name': 'queue', + 'object_list': [], + 'title': _(u'OCR log items'), + #'hide_object': True, + #'hide_link': True, + 'extra_columns': [ + {'name': _(u'document'), 'attribute': encapsulate(lambda x: document_link(x.document_version.document) if hasattr(x, 'document_version') else _(u'Missing document.'))}, + {'name': _(u'version'), 'attribute': 'document_version'}, + {'name': _(u'thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document_version.document))}, + {'name': _('submitted'), 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True}, + #{'name': _('delay'), 'attribute': 'delay'}, + #{'name': _('state'), 'attribute': encapsulate(lambda x: x.get_state_display())}, + #{'name': _('node'), 'attribute': 'node_name'}, + {'name': _('result'), 'attribute': 'result'}, + ], + 'multi_select_as_buttons': True, + 'sidebar_subtemplates_list': [ + { + 'name': 'generic_subtemplate.html', + 'context': { + 'side_bar': True, + 'title': _(u'OCR processing properties'), + 'content': _(u'Current state: %s') % OCRProcessingSingleton.get().get_state_display(), } - ] - }, - ) + } + ] + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + # 'queue': document_queue, + # 'object_name': _(u'document queue'), + # 'navigation_object_name': 'queue', + # 'list_object_variable_name': 'queue_document', + # }, + #) + + +def ocr_disable(request): + Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + OCRProcessingSingleton.get().disable() + except OCRProcessingAlreadyDisabled: + messages.warning(request, _(u'OCR processing already disabled.')) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'OCR processing disabled successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'queue': OCRProcessingSingleton.get(), + 'navigation_object_name': 'queue', + 'title': _(u'Are you sure you wish to disable OCR processing?'), + 'next': next, + 'previous': previous, + 'form_icon': u'control_stop_blue.png', + }, context_instance=RequestContext(request)) + + +def ocr_enable(request): + Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + OCRProcessingSingleton.get().enable() + except OCRProcessingAlreadyDisabled: + messages.warning(request, _(u'OCR processing already enabled.')) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'OCR processing enabled successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'queue': OCRProcessingSingleton.get(), + 'navigation_object_name': 'queue', + 'title': _(u'Are you sure you wish to enable OCR processing?'), + 'next': next, + 'previous': previous, + 'form_icon': u'control_play_blue.png', + }, context_instance=RequestContext(request)) + def queue_document_delete(request, queue_document_id=None, queue_document_id_list=None): @@ -136,15 +194,16 @@ def submit_document(request, document_id): def submit_document_to_queue(request, document, post_submit_redirect=None): - ''' + """ This view is meant to be reusable - ''' + """ try: - document_queue = DocumentQueue.objects.queue_document(document) - messages.success(request, _(u'Document: %(document)s was added to the OCR queue: %(queue)s.') % { - 'document': document, 'queue': document_queue.label}) - except AlreadyQueued: + document.submit_for_ocr() + #ocr_job_queue.push(ocr_job_type, document_version_pk=document.latest_version.pk) + messages.success(request, _(u'Document: %(document)s was added to the OCR queue sucessfully.') % { + 'document': document}) + except JobQueuePushError: messages.warning(request, _(u'Document: %(document)s is already queued.') % { 'document': document}) except Exception, e: @@ -175,12 +234,12 @@ def re_queue_document(request, queue_document_id=None, queue_document_id_list=No messages.success( request, _(u'Document: %(document)s was re-queued to the OCR queue: %(queue)s') % { - 'document': queue_document.document, + 'document': queue_document.document_version.document, 'queue': queue_document.document_queue.label } ) except Document.DoesNotExist: - messages.error(request, _(u'Document id#: %d, no longer exists.') % queue_document.document_id) + messages.error(request, _(u'Document no longer in queue.')) except ReQueueError: messages.warning( request, @@ -208,60 +267,6 @@ def re_queue_multiple_document(request): return re_queue_document(request, queue_document_id_list=request.GET.get('id_list', [])) -def document_queue_disable(request, document_queue_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) - - next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) - previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) - document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id) - - if document_queue.state == DOCUMENTQUEUE_STATE_STOPPED: - messages.warning(request, _(u'Document queue: %s, already stopped.') % document_queue) - return HttpResponseRedirect(previous) - - if request.method == 'POST': - document_queue.state = DOCUMENTQUEUE_STATE_STOPPED - document_queue.save() - messages.success(request, _(u'Document queue: %s, stopped successfully.') % document_queue) - return HttpResponseRedirect(next) - - return render_to_response('generic_confirm.html', { - 'queue': document_queue, - 'navigation_object_name': 'queue', - 'title': _(u'Are you sure you wish to disable document queue: %s') % document_queue, - 'next': next, - 'previous': previous, - 'form_icon': u'control_stop_blue.png', - }, context_instance=RequestContext(request)) - - -def document_queue_enable(request, document_queue_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]) - - next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) - previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) - document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id) - - if document_queue.state == DOCUMENTQUEUE_STATE_ACTIVE: - messages.warning(request, _(u'Document queue: %s, already active.') % document_queue) - return HttpResponseRedirect(previous) - - if request.method == 'POST': - document_queue.state = DOCUMENTQUEUE_STATE_ACTIVE - document_queue.save() - messages.success(request, _(u'Document queue: %s, activated successfully.') % document_queue) - return HttpResponseRedirect(next) - - return render_to_response('generic_confirm.html', { - 'queue': document_queue, - 'navigation_object_name': 'queue', - 'title': _(u'Are you sure you wish to activate document queue: %s') % document_queue, - 'next': next, - 'previous': previous, - 'form_icon': u'control_play_blue.png', - }, context_instance=RequestContext(request)) - - def all_document_ocr_cleanup(request): Permission.objects.check_permissions(request.user, [PERMISSION_OCR_CLEAN_ALL_PAGES]) @@ -297,126 +302,3 @@ def display_link(obj): return u''.join(output) else: return obj - - -# Setup views -def setup_queue_transformation_list(request, document_queue_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT]) - - document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id) - - context = { - 'object_list': QueueTransformation.transformations.get_for_object(document_queue), - 'title': _(u'transformations for: %s') % document_queue, - 'queue': document_queue, - 'object_name': _(u'document queue'), - 'navigation_object_name': 'queue', - 'list_object_variable_name': 'transformation', - 'extra_columns': [ - {'name': _(u'order'), 'attribute': 'order'}, - {'name': _(u'transformation'), 'attribute': encapsulate(lambda x: x.get_transformation_display())}, - {'name': _(u'arguments'), 'attribute': 'arguments'} - ], - 'hide_link': True, - 'hide_object': True, - } - - return render_to_response('generic_list.html', context, - context_instance=RequestContext(request)) - - -def setup_queue_transformation_edit(request, transformation_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT]) - - transformation = get_object_or_404(QueueTransformation, pk=transformation_id) - redirect_view = reverse('setup_queue_transformation_list', args=[transformation.content_object.pk]) - next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', redirect_view))) - - if request.method == 'POST': - form = QueueTransformationForm(instance=transformation, data=request.POST) - if form.is_valid(): - try: - form.save() - messages.success(request, _(u'Queue transformation edited successfully')) - return HttpResponseRedirect(next) - except Exception, e: - messages.error(request, _(u'Error editing queue transformation; %s') % e) - else: - form = QueueTransformationForm(instance=transformation) - - return render_to_response('generic_form.html', { - 'title': _(u'Edit transformation: %s') % transformation, - 'form': form, - 'queue': transformation.content_object, - 'transformation': transformation, - 'navigation_object_list': [ - {'object': 'queue', 'name': _(u'document queue')}, - {'object': 'transformation', 'name': _(u'transformation')} - ], - 'next': next, - }, - context_instance=RequestContext(request)) - - -def setup_queue_transformation_delete(request, transformation_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT]) - - transformation = get_object_or_404(QueueTransformation, pk=transformation_id) - redirect_view = reverse('setup_queue_transformation_list', args=[transformation.content_object.pk]) - previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', redirect_view))) - - if request.method == 'POST': - try: - transformation.delete() - messages.success(request, _(u'Queue transformation deleted successfully.')) - except Exception, e: - messages.error(request, _(u'Error deleting queue transformation; %(error)s') % { - 'error': e} - ) - return HttpResponseRedirect(redirect_view) - - return render_to_response('generic_confirm.html', { - 'delete_view': True, - 'transformation': transformation, - 'queue': transformation.content_object, - 'navigation_object_list': [ - {'object': 'queue', 'name': _(u'document queue')}, - {'object': 'transformation', 'name': _(u'transformation')} - ], - 'title': _(u'Are you sure you wish to delete queue transformation "%(transformation)s"') % { - 'transformation': transformation.get_transformation_display(), - }, - 'previous': previous, - 'form_icon': u'shape_square_delete.png', - }, - context_instance=RequestContext(request)) - - -def setup_queue_transformation_create(request, document_queue_id): - Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT]) - - document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id) - - redirect_view = reverse('setup_queue_transformation_list', args=[document_queue.pk]) - - if request.method == 'POST': - form = QueueTransformationForm_create(request.POST) - if form.is_valid(): - try: - queue_tranformation = form.save(commit=False) - queue_tranformation.content_object = document_queue - queue_tranformation.save() - messages.success(request, _(u'Queue transformation created successfully')) - return HttpResponseRedirect(redirect_view) - except Exception, e: - messages.error(request, _(u'Error creating queue transformation; %s') % e) - else: - form = QueueTransformationForm_create() - - return render_to_response('generic_form.html', { - 'form': form, - 'queue': document_queue, - 'object_name': _(u'document queue'), - 'navigation_object_name': 'queue', - 'title': _(u'Create new transformation for queue: %s') % document_queue, - }, context_instance=RequestContext(request)) diff --git a/apps/scheduler/__init__.py b/apps/scheduler/__init__.py index 18d7ea6e9f..25fcbf6190 100644 --- a/apps/scheduler/__init__.py +++ b/apps/scheduler/__init__.py @@ -2,43 +2,31 @@ from __future__ import absolute_import import logging import atexit +import sys -from .runtime import scheduler - -from django.db.models.signals import post_syncdb -from django.dispatch import receiver - -from south.signals import pre_migrate - -from signaler.signals import pre_collectstatic from project_tools.api import register_tool +from navigation.api import bind_links + +from .links import scheduler_tool_link, scheduler_list, job_list +from .literals import SHUTDOWN_COMMANDS +from .api import LocalScheduler -from .links import job_list - logger = logging.getLogger(__name__) -@receiver(post_syncdb, dispatch_uid='scheduler_shutdown_post_syncdb') -def scheduler_shutdown_post_syncdb(sender, **kwargs): - logger.debug('Scheduler shut down on post syncdb signal') - scheduler.shutdown() - - -@receiver(pre_collectstatic, dispatch_uid='sheduler_shutdown_pre_collectstatic') -def sheduler_shutdown_pre_collectstatic(sender, **kwargs): - logger.debug('Scheduler shut down on collectstatic signal') - scheduler.shutdown() - - -@receiver(pre_migrate, dispatch_uid='sheduler_shutdown_pre_migrate') -def sheduler_shutdown_pre_migrate(sender, **kwargs): - logger.debug('Scheduler shut down on pre_migrate signal') - scheduler.shutdown() - def schedule_shutdown_on_exit(): - logger.debug('Scheduler shut down on exit') - scheduler.shutdown() + logger.debug('Schedulers shut down on exit') + LocalScheduler.shutdown_all() -register_tool(job_list) +if any([command in sys.argv for command in SHUTDOWN_COMMANDS]): + logger.debug('Schedulers shut down on SHUTDOWN_COMMAND') + # Shutdown any scheduler already running + LocalScheduler.shutdown_all() + # Prevent any new scheduler afterwards to start + LocalScheduler.lockdown() + +register_tool(scheduler_tool_link) atexit.register(schedule_shutdown_on_exit) +bind_links([LocalScheduler, 'scheduler_list', 'job_list'], scheduler_list, menu_name='secondary_menu') +bind_links([LocalScheduler], job_list) diff --git a/apps/scheduler/api.py b/apps/scheduler/api.py index 6ce39bc3c0..aeebd6fbec 100644 --- a/apps/scheduler/api.py +++ b/apps/scheduler/api.py @@ -1,30 +1,147 @@ from __future__ import absolute_import -from .runtime import scheduler -from .exceptions import AlreadyScheduled +import logging -registered_jobs = {} +from apscheduler.scheduler import Scheduler as OriginalScheduler + +from django.utils.translation import ugettext_lazy as _ + +from .exceptions import AlreadyScheduled, UnknownJobClass + +logger = logging.getLogger(__name__) -def register_interval_job(name, title, func, weeks=0, days=0, hours=0, minutes=0, - seconds=0, start_date=None, args=None, - kwargs=None, job_name=None, **options): +class SchedulerJobBase(object): + job_type = u'' - if name in registered_jobs: - raise AlreadyScheduled + def __init__(self, name, label, function, *args, **kwargs): + self.scheduler = None + self.name = name + self.label = label + self.function = function + self.args = args + self.kwargs = kwargs - job = scheduler.add_interval_job(func=func, weeks=weeks, days=days, - hours=hours, minutes=minutes, seconds=seconds, - start_date=start_date, args=args, kwargs=kwargs, **options) + def stop(self): + self.scheduler.stop_job(self) - registered_jobs[name] = {'title': title, 'job': job} + @property + def running(self): + if self.scheduler: + return self.scheduler.running + else: + return False + + @property + def start_date(self): + return self._job.trigger.start_date -def remove_job(name): - if name in registered_jobs: - scheduler.unschedule_job(registered_jobs[name]['job']) - registered_jobs.pop(name) +class IntervalJob(SchedulerJobBase): + job_type = _(u'Interval job') + + def start(self, scheduler): + scheduler.add_job(self) -def get_job_list(): - return registered_jobs.values() +class DateJob(SchedulerJobBase): + job_type = _(u'Date job') + + def start(self, scheduler): + scheduler.add_job(self) + + +class LocalScheduler(object): + scheduler_registry = {} + lockdown = False + + @classmethod + def get(cls, name): + return cls.scheduler_registry[name] + + @classmethod + def get_all(cls): + return cls.scheduler_registry.values() + + @classmethod + def shutdown_all(cls): + for scheduler in cls.scheduler_registry.values(): + scheduler.stop() + + @classmethod + def lockdown(cls): + cls.lockdown = True + + def __init__(self, name, label=None): + self.scheduled_jobs = {} + self._scheduler = None + self.name = name + self.label = label + self.__class__.scheduler_registry[self.name] = self + + def start(self): + logger.debug('starting scheduler: %s' % self.name) + if not self.__class__.lockdown: + self._scheduler = OriginalScheduler() + for job in self.scheduled_jobs.values(): + self._schedule_job(job) + + self._scheduler.start() + else: + logger.debug('lockdown in effect') + + def stop(self): + if self._scheduler: + self._scheduler.shutdown() + del self._scheduler + self._scheduler = None + + @property + def running(self): + if self._scheduler: + return self._scheduler.running + else: + return False + + def clear(self): + for job in self.scheduled_jobs.values(): + self.stop_job(job) + + def stop_job(self, job): + self._scheduler.unschedule_job(job._job) + del(self.scheduled_jobs[job.name]) + job.scheduler = None + + def _schedule_job(self, job): + if isinstance(job, IntervalJob): + job._job = self._scheduler.add_interval_job(job.function, *job.args, **job.kwargs) + elif isinstance(job, DateJob): + job._job = self._scheduler.add_date_job(job.function, *job.args, **job.kwargs) + else: + raise UnknownJobClass + + def add_job(self, job): + if job.scheduler or job.name in self.scheduled_jobs.keys(): + raise AlreadyScheduled + + if self._scheduler: + self._scheduler_job(job) + + job.scheduler = self + self.scheduled_jobs[job.name] = job + + def add_interval_job(self, name, label, function, *args, **kwargs): + job = IntervalJob(name=name, label=label, function=function, *args, **kwargs) + self.add_job(job) + return job + + def add_date_job(self, name, label, function, *args, **kwargs): + job = DateJob(name=name, label=label, function=function, *args, **kwargs) + self.add_job(job) + return job + + def get_job_list(self): + return self.scheduled_jobs.values() + + def __unicode__(self): + return unicode(self.label or self.name) diff --git a/apps/scheduler/exceptions.py b/apps/scheduler/exceptions.py index f30d9fe815..6d6515a76c 100644 --- a/apps/scheduler/exceptions.py +++ b/apps/scheduler/exceptions.py @@ -1,2 +1,14 @@ class AlreadyScheduled(Exception): + """ + Raised when trying to schedule a Job instance of anything after it was + already scheduled in any other scheduler + """ + pass + + +class UnknownJobClass(Exception): + """ + Raised when trying to schedule a Job that is not of a a type: + IntervalJob or DateJob + """ pass diff --git a/apps/scheduler/links.py b/apps/scheduler/links.py index 7808f3331c..9ad1a1ff10 100644 --- a/apps/scheduler/links.py +++ b/apps/scheduler/links.py @@ -4,6 +4,8 @@ from django.utils.translation import ugettext_lazy as _ from navigation.api import Link -from .permissions import PERMISSION_VIEW_JOB_LIST +from .permissions import PERMISSION_VIEW_JOB_LIST, PERMISSION_VIEW_SCHEDULER_LIST -job_list = Link(text=_(u'interval job list'), view='job_list', icon='time.png', permissions=[PERMISSION_VIEW_JOB_LIST]) +scheduler_tool_link = Link(text=_(u'local schedulers'), view='scheduler_list', icon='time.png', permissions=[PERMISSION_VIEW_SCHEDULER_LIST]) +scheduler_list = Link(text=_(u'scheduler list'), view='scheduler_list', sprite='time', permissions=[PERMISSION_VIEW_SCHEDULER_LIST]) +job_list = Link(text=_(u'interval job list'), view='job_list', args='object.name', sprite='timeline_marker', permissions=[PERMISSION_VIEW_JOB_LIST]) diff --git a/apps/scheduler/literals.py b/apps/scheduler/literals.py new file mode 100644 index 0000000000..b56b5148d7 --- /dev/null +++ b/apps/scheduler/literals.py @@ -0,0 +1 @@ +SHUTDOWN_COMMANDS = ['syncdb', 'migrate', 'schemamigration', 'datamigration', 'collectstatic', 'shell', 'shell_plus'] diff --git a/apps/scheduler/permissions.py b/apps/scheduler/permissions.py index 203f675ff4..2ef2343811 100644 --- a/apps/scheduler/permissions.py +++ b/apps/scheduler/permissions.py @@ -5,4 +5,5 @@ from django.utils.translation import ugettext_lazy as _ from permissions.models import PermissionNamespace, Permission namespace = PermissionNamespace('scheduler', _(u'Scheduler')) -PERMISSION_VIEW_JOB_LIST = Permission.objects.register(namespace, 'jobs_list', _(u'View the interval job list')) +PERMISSION_VIEW_SCHEDULER_LIST = Permission.objects.register(namespace, 'schedulers_list', _(u'View the local scheduler list')) +PERMISSION_VIEW_JOB_LIST = Permission.objects.register(namespace, 'jobs_list', _(u'View the local scheduler job list')) diff --git a/apps/scheduler/runtime.py b/apps/scheduler/runtime.py deleted file mode 100644 index a9440e946b..0000000000 --- a/apps/scheduler/runtime.py +++ /dev/null @@ -1,4 +0,0 @@ -from apscheduler.scheduler import Scheduler - -scheduler = Scheduler() -scheduler.start() diff --git a/apps/scheduler/urls.py b/apps/scheduler/urls.py index fde9602994..3630a34bbc 100644 --- a/apps/scheduler/urls.py +++ b/apps/scheduler/urls.py @@ -1,5 +1,6 @@ from django.conf.urls.defaults import patterns, url urlpatterns = patterns('scheduler.views', - url(r'^list/$', 'job_list', (), 'job_list'), + url(r'^scheduler/list/$', 'scheduler_list', (), 'scheduler_list'), + url(r'^scheduler/(?P\w+)/job/list/$', 'job_list', (), 'job_list'), ) diff --git a/apps/scheduler/views.py b/apps/scheduler/views.py index 597632eca7..fb622ce818 100644 --- a/apps/scheduler/views.py +++ b/apps/scheduler/views.py @@ -3,32 +3,67 @@ from __future__ import absolute_import from django.shortcuts import render_to_response from django.template import RequestContext from django.utils.translation import ugettext_lazy as _ +from django.http import Http404 from permissions.models import Permission -from common.utils import encapsulate -from .permissions import PERMISSION_VIEW_JOB_LIST -from .api import get_job_list +from .permissions import PERMISSION_VIEW_SCHEDULER_LIST, PERMISSION_VIEW_JOB_LIST +from .api import LocalScheduler -def job_list(request): - Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_JOB_LIST]) +def scheduler_list(request): + Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_SCHEDULER_LIST]) context = { - 'object_list': get_job_list(), - 'title': _(u'interval jobs'), + 'object_list': LocalScheduler.get_all(), + 'title': _(u'local schedulers'), 'extra_columns': [ + { + 'name': _(u'name'), + 'attribute': 'name' + }, { 'name': _(u'label'), - 'attribute': encapsulate(lambda job: job['title']) + 'attribute': 'label' + }, + { + 'name': _(u'running'), + 'attribute': 'running' + }, + ], + 'hide_object': True, + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + +def job_list(request, scheduler_name): + Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_JOB_LIST]) + try: + scheduler = LocalScheduler.get(scheduler_name) + except: + raise Http404 + + context = { + 'object_list': scheduler.get_job_list(), + 'title': _(u'local jobs in scheduler: %s') % scheduler, + 'extra_columns': [ + { + 'name': _(u'name'), + 'attribute': 'name' + }, + { + 'name': _(u'label'), + 'attribute': 'label' }, { 'name': _(u'start date time'), - 'attribute': encapsulate(lambda job: job['job'].trigger.start_date) + 'attribute': 'start_date' }, { - 'name': _(u'interval'), - 'attribute': encapsulate(lambda job: job['job'].trigger.interval) + 'name': _(u'type'), + 'attribute': 'job_type' }, ], 'hide_object': True, diff --git a/apps/sources/__init__.py b/apps/sources/__init__.py index 4daa0ccd04..6ed77c392b 100644 --- a/apps/sources/__init__.py +++ b/apps/sources/__init__.py @@ -6,7 +6,7 @@ from navigation.api import (bind_links, register_model_list_columns) from common.utils import encapsulate from project_setup.api import register_setup -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from documents.models import Document from .staging import StagingFile @@ -62,8 +62,10 @@ register_model_list_columns(StagingFile, [ register_setup(setup_sources) -register_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL) -register_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler = LocalScheduler('sources', _(u'Document sources')) +sources_scheduler.add_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler.add_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler.start() bind_links(['document_list_recent', 'document_list', 'document_create', 'document_create_multiple', 'upload_interactive', 'staging_file_delete'], [document_create_multiple], menu_name='secondary_menu') bind_links([Document], [document_create_multiple], menu_name='secondary_menu') diff --git a/apps/sources/models.py b/apps/sources/models.py index 31b89b66f0..9c779eebe7 100644 --- a/apps/sources/models.py +++ b/apps/sources/models.py @@ -27,7 +27,6 @@ from converter.literals import DIMENSION_SEPARATOR from documents.models import Document, DocumentType from documents.events import history_document_created from metadata.api import save_metadata_list -from scheduler.api import register_interval_job, remove_job from acls.utils import apply_default_acls from .managers import SourceTransformationManager, SourceLogManager @@ -43,6 +42,7 @@ from .literals import (SOURCE_CHOICES, SOURCE_CHOICES_PLURAL, IMAP_DEFAULT_MAILBOX) from .compressed_file import CompressedFile, NotACompressedFile from .conf.settings import POP3_TIMEOUT +#from . import sources_scheduler logger = logging.getLogger(__name__) @@ -441,17 +441,18 @@ class WatchFolder(BaseModel): interval = models.PositiveIntegerField(verbose_name=_(u'interval'), help_text=_(u'Inverval in seconds where the watch folder path is checked for new documents.')) def save(self, *args, **kwargs): - if self.pk: - remove_job(self.internal_name()) + #if self.pk: + # remove_job(self.internal_name()) super(WatchFolder, self).save(*args, **kwargs) self.schedule() def schedule(self): - if self.enabled: - register_interval_job(self.internal_name(), - title=self.fullname(), func=self.execute, - kwargs={'source_id': self.pk}, seconds=self.interval - ) + pass + #if self.enabled: + # sources_scheduler.add_interval_job(self.internal_name(), + # title=self.fullname(), function=self.execute, + # seconds=self.interval, kwargs={'source_id': self.pk} + # ) def execute(self, source_id): source = WatchFolder.objects.get(pk=source_id) diff --git a/docs/releases/0.13.rst b/docs/releases/0.13.rst index 027d42f17b..f3313869b1 100644 --- a/docs/releases/0.13.rst +++ b/docs/releases/0.13.rst @@ -73,10 +73,13 @@ Afterwards migrate existing database schema with:: $ ./manage.py migrate metadata 0001 --fake $ ./manage.py migrate acls 0001 --fake $ ./manage.py migrate ocr 0001 --fake + $ ./manage.py migrate ocr $ ./manage.py migrate history 0001 --fake $ ./manage.py migrate tags 0001 --fake $ ./manage.py migrate linking 0001 --fake $ ./manage.py migrate lock_manager 0001 --fake + $ ./manage.py migrate job_processor + $ ./manage.py migrate clustering Issue the following command to index existing documents in the new full text search database:: diff --git a/fabfile/templates/settings_local.py b/fabfile/templates/settings_local.py new file mode 100644 index 0000000000..633f25e6c8 --- /dev/null +++ b/fabfile/templates/settings_local.py @@ -0,0 +1,10 @@ +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.%(django_database_driver)s', + 'NAME': '%(database_name)s', + 'USER': '%(database_username)s', + 'PASSWORD': '%(database_password)s', + 'HOST': '%(database_host)s', + 'PORT': '', + } +} diff --git a/settings.py b/settings.py index fa4bfb8ea5..aa9dc1e814 100644 --- a/settings.py +++ b/settings.py @@ -162,6 +162,7 @@ INSTALLED_APPS = ( 'converter', 'user_management', 'mimetype', + 'clustering', 'scheduler', 'job_processor', # Mayan EDMS @@ -187,7 +188,7 @@ INSTALLED_APPS = ( 'workflows', 'checkouts', 'rest_api', - 'bootstrap', + #'bootstrap', 'statistics', # Has to be last so the other apps can register it's signals diff --git a/urls.py b/urls.py index 4a5ebe61ec..82b10cc5d1 100644 --- a/urls.py +++ b/urls.py @@ -36,10 +36,12 @@ urlpatterns = patterns('', (r'^checkouts/', include('checkouts.urls')), (r'^installation/', include('installation.urls')), (r'^scheduler/', include('scheduler.urls')), + (r'^job_processing/', include('job_processor.urls')), (r'^bootstrap/', include('bootstrap.urls')), (r'^diagnostics/', include('diagnostics.urls')), (r'^maintenance/', include('maintenance.urls')), (r'^statistics/', include('statistics.urls')), + (r'^clustering/', include('clustering.urls')), )