Merge branch 'feature/ocr_document_version' into development

This commit is contained in:
Roberto Rosario
2012-08-01 01:47:40 -04:00
71 changed files with 1845 additions and 647 deletions

20
.gitignore vendored
View File

@@ -1,15 +1,15 @@
*.orig
*.pyc
*.pyo
site_media/photologue/photos/*
site_media/photologue/photos/cache/*
*.sqlite
settings_local.py
site_media/documents/*
celerybeat-schedule
document_storage/
misc/mayan.geany
image_cache/
/*.sqlite
/settings_local.py
/celerybeat-schedule
/document_storage/
/misc/mayan.geany
/image_cache/
build/
_build/
gpg_home/
/gpg_home/
/static/
/whoosh_index/
/fabfile_install

View File

@@ -9,13 +9,13 @@ from documents.models import DocumentType, DocumentTypeFilename, Document
from metadata.models import MetadataType, MetadataSet
from document_indexing.models import Index, IndexTemplateNode
from sources.models import WebForm, StagingFolder
from ocr.models import QueueDocument, QueueTransformation, DocumentQueue
from history.models import History
from taggit.models import Tag
from tags.models import TagProperties
from folders.models import Folder
from dynamic_search.models import RecentSearch
from django_gpg.runtime import gpg
# TODO: clear the job queues
bootstrap_options = {}
@@ -63,18 +63,6 @@ def nuke_database():
for obj in Role.objects.all():
obj.delete()
# Delete all document in the ocr queue
for obj in QueueDocument.objects.all():
obj.delete()
# Delete all the transformations for a queue
for obj in QueueTransformation.objects.all():
obj.delete()
# Delete all the ocr document queues
for obj in DocumentQueue.objects.all():
obj.delete()
# Delete all the remaining history events
for obj in History.objects.all():
obj.delete()

View File

@@ -3,7 +3,7 @@ from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from navigation.api import bind_links, register_top_menu
from scheduler.api import register_interval_job
from scheduler.api import LocalScheduler
from documents.models import Document
from acls.api import class_permissions
@@ -14,6 +14,7 @@ from .permissions import (PERMISSION_DOCUMENT_CHECKOUT,
from .links import checkout_list, checkout_document, checkout_info, checkin_document
from .models import DocumentCheckout
from .tasks import task_check_expired_check_outs
from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL
def initialize_document_checkout_extra_methods():
@@ -34,6 +35,8 @@ class_permissions(Document, [
PERMISSION_DOCUMENT_RESTRICTIONS_OVERRIDE
])
CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed
register_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL)
checkouts_scheduler = LocalScheduler('checkouts', _(u'Document checkouts'))
checkouts_scheduler.add_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL)
checkouts_scheduler.start()
initialize_document_checkout_extra_methods()

View File

@@ -14,3 +14,5 @@ STATE_LABELS = {
STATE_CHECKED_OUT: _(u'checked out'),
STATE_CHECKED_IN: _(u'checked in/available'),
}
CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed

View File

@@ -0,0 +1,19 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from scheduler.api import LocalScheduler
from navigation.api import bind_links
from project_tools.api import register_tool
from .tasks import node_heartbeat, house_keeping
from .links import tool_link, node_list
from .models import Node, ClusteringConfig
clustering_scheduler = LocalScheduler('clustering', _(u'Clustering'))
clustering_scheduler.add_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval)
clustering_scheduler.add_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1)
clustering_scheduler.start()
register_tool(tool_link)
bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu')

13
apps/clustering/admin.py Normal file
View File

@@ -0,0 +1,13 @@
from __future__ import absolute_import
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
from .models import Node
class NodeAdmin(admin.ModelAdmin):
list_display = ('hostname', 'cpuload', 'heartbeat', 'memory_usage')
admin.site.register(Node, NodeAdmin)

10
apps/clustering/links.py Normal file
View File

@@ -0,0 +1,10 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from navigation.api import Link
from .permissions import (PERMISSION_NODES_VIEW)
tool_link = Link(text=_(u'clustering'), view='node_list', icon='server.png', permissions=[PERMISSION_NODES_VIEW]) # children_view_regex=[r'^index_setup', r'^template_node'])
node_list = Link(text=_(u'node list'), view='node_list', sprite='server', permissions=[PERMISSION_NODES_VIEW])

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'Node'
db.create_table('clustering_node', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)),
('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)),
('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)),
('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)),
))
db.send_create_signal('clustering', ['Node'])
def backwards(self, orm):
# Deleting model 'Node'
db.delete_table('clustering_node')
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
}
}
complete_apps = ['clustering']

View File

@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'ClusteringConfig'
db.create_table('clustering_clusteringconfig', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)),
('node_time_to_live', self.gf('django.db.models.fields.PositiveIntegerField')()),
('node_heartbeat_interval', self.gf('django.db.models.fields.PositiveIntegerField')()),
))
db.send_create_signal('clustering', ['ClusteringConfig'])
def backwards(self, orm):
# Deleting model 'ClusteringConfig'
db.delete_table('clustering_clusteringconfig')
models = {
'clustering.clusteringconfig': {
'Meta': {'object_name': 'ClusteringConfig'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}),
'node_heartbeat_interval': ('django.db.models.fields.PositiveIntegerField', [], {}),
'node_time_to_live': ('django.db.models.fields.PositiveIntegerField', [], {})
},
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
}
}
complete_apps = ['clustering']

View File

@@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Changing field 'Node.cpuload'
db.alter_column('clustering_node', 'cpuload', self.gf('django.db.models.fields.FloatField')())
def backwards(self, orm):
# Changing field 'Node.cpuload'
db.alter_column('clustering_node', 'cpuload', self.gf('django.db.models.fields.PositiveIntegerField')())
models = {
'clustering.clusteringconfig': {
'Meta': {'object_name': 'ClusteringConfig'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}),
'node_heartbeat_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '1'}),
'node_time_to_live': ('django.db.models.fields.PositiveIntegerField', [], {'default': '5'})
},
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'})
}
}
complete_apps = ['clustering']

View File

89
apps/clustering/models.py Normal file
View File

@@ -0,0 +1,89 @@
from __future__ import absolute_import
import os
import datetime
import platform
import psutil
from django.db import models, IntegrityError, transaction
from django.db import close_connection
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ugettext
from common.models import Singleton
DEFAULT_NODE_TTL = 5
DEFAULT_NODE_HEARTBEAT_INTERVAL = 1
class NodeManager(models.Manager):
def myself(self):
node, created = self.model.objects.get_or_create(hostname=platform.node())
node.refresh()
if created:
# Store the refresh data because is a new instance
node.save()
return node
class Node(models.Model):
hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'))
cpuload = models.FloatField(blank=True, default=0.0, verbose_name=_(u'cpu load'))
heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check'))
memory_usage = models.FloatField(blank=True, default=0.0, verbose_name=_(u'memory usage'))
objects = NodeManager()
@classmethod
def platform_info(cls):
return {
'cpuload': psutil.cpu_percent(),
'memory_usage': psutil.phymem_usage().percent
}
def __unicode__(self):
return self.hostname
def refresh(self):
if self.hostname == platform.node():
# Make we can only update ourselves
info = Node.platform_info()
self.cpuload = info['cpuload']
self.memory_usage = info['memory_usage']
def save(self, *args, **kwargs):
self.heartbeat = datetime.datetime.now()
return super(Node, self).save(*args, **kwargs)
class Meta:
verbose_name = _(u'node')
verbose_name_plural = _(u'nodes')
class ClusteringConfigManager(models.Manager):
def dead_nodes(self):
return Node.objects.filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_time_to_live))
def delete_dead_nodes(self):
self.dead_nodes().delete()
def zombiest_node(self):
try:
return self.dead_nodes().order_by('-heartbeat')[0]
except IndexError:
return None
class ClusteringConfig(Singleton):
node_time_to_live = models.PositiveIntegerField(verbose_name=(u'time to live (in seconds)'), default=DEFAULT_NODE_TTL) # After this time a worker is considered dead
node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'heartbeat interval'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL)
# TODO: add validation, interval cannot be greater than TTL
objects = ClusteringConfigManager()
def __unicode__(self):
return ugettext('clustering config')
class Meta:
verbose_name = verbose_name_plural = _(u'clustering config')

View File

@@ -0,0 +1,8 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from permissions.models import PermissionNamespace, Permission
namespace = PermissionNamespace('clustering', _(u'Clustering'))
PERMISSION_NODES_VIEW = Permission.objects.register(namespace, 'nodes_view', _(u'View the nodes in a Mayan cluster'))

Binary file not shown.

After

Width:  |  Height:  |  Size: 997 B

25
apps/clustering/tasks.py Normal file
View File

@@ -0,0 +1,25 @@
from __future__ import absolute_import
import logging
from lock_manager.decorators import simple_locking
from .models import Node, ClusteringConfig
LOCK_EXPIRE = 10
logger = logging.getLogger(__name__)
@simple_locking('node_heartbeat', 10)
def node_heartbeat():
logger.debug('starting')
node = Node.objects.myself()
node.save()
@simple_locking('house_keeping', 10)
def house_keeping():
logger.debug('starting')
ClusteringConfig.objects.delete_dead_nodes()

6
apps/clustering/urls.py Normal file
View File

@@ -0,0 +1,6 @@
from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('clustering.views',
url(r'^node/list/$', 'node_list', (), 'node_list'),
)

67
apps/clustering/views.py Normal file
View File

@@ -0,0 +1,67 @@
from __future__ import absolute_import
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy as _
from django.shortcuts import get_object_or_404
from django.db.models.loading import get_model
from django.http import Http404
from django.core.exceptions import PermissionDenied
from permissions.models import Permission
from common.utils import encapsulate
from acls.models import AccessEntry
from .models import Node
from .permissions import PERMISSION_NODES_VIEW
def node_list(request):
Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW])
context = {
'object_list': Node.objects.all(),
'title': _(u'nodes'),
'extra_columns_preffixed': [
{
'name': _(u'hostname'),
'attribute': 'hostname',
},
{
'name': _(u'cpu load'),
'attribute': 'cpuload',
},
{
'name': _(u'heartbeat'),
'attribute': 'heartbeat',
},
{
'name': _(u'memory usage'),
'attribute': 'memory_usage',
},
],
'hide_object': True,
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def node_workers(request, node_pk):
node = get_object_or_404(Node, pk=node_pk)
try:
Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW])
except PermissionDenied:
AccessEntry.objects.check_access(PERMISSION_NODES_VIEW, request.user, node)
context = {
'object_list': node.workers.all(),
'title': _(u'workers for node: %s') % node,
'object': node,
'hide_object': True,
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))

View File

@@ -17,7 +17,11 @@ class Singleton(models.Model):
lock_id = models.CharField(max_length=1, default=SINGLETON_LOCK_ID, editable=False, verbose_name=_(u'lock field'), unique=True)
objects = SingletonManager()
@classmethod
def get(cls):
return cls.objects.get()
def save(self, *args, **kwargs):
self.id = 1
super(Singleton, self).save(*args, **kwargs)

View File

@@ -9,9 +9,8 @@ from django.core.management import call_command
from navigation.api import register_sidebar_template, bind_links, Link
from documents.models import Document
from scheduler.runtime import scheduler
from scheduler.api import LocalScheduler
from signaler.signals import post_update_index, pre_update_index
from scheduler.api import register_interval_job
from lock_manager import Lock, LockError
from .models import IndexableObject
@@ -36,7 +35,7 @@ def scheduler_shutdown_pre_update_index(sender, mayan_runtime, **kwargs):
# Only shutdown the scheduler if the command is called from the command
# line
if not mayan_runtime:
scheduler.shutdown()
LocalScheduler.shutdown_all()
def search_index_update():
@@ -61,4 +60,6 @@ def search_index_update():
bind_links(['search', 'search_advanced', 'results'], [search], menu_name='form_header')
bind_links(['results'], [search_again], menu_name='sidebar')
register_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL)
dynamic_search_scheduler = LocalScheduler('search', _(u'Search'))
dynamic_search_scheduler.add_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL)
dynamic_search_scheduler.start()

View File

@@ -0,0 +1,37 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from scheduler.api import LocalScheduler
from navigation.api import bind_links, register_model_list_columns
from project_tools.api import register_tool
from common.utils import encapsulate
from clustering.models import Node
from .models import JobQueue
from .tasks import job_queue_poll
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active)
#TODO: fix this, make it cluster wide
JOB_QUEUE_POLL_INTERVAL = 1
job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor'))
job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL)
job_processor_scheduler.start()
register_tool(tool_link)
bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error])
bind_links([Node], [node_workers])
Node.add_to_class('workers', lambda node: node.worker_set)
register_model_list_columns(Node, [
{
'name': _(u'active workers'),
'attribute': encapsulate(lambda x: x.workers().all().count())
},
])

View File

@@ -0,0 +1,23 @@
from __future__ import absolute_import
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
from .models import JobQueue, JobQueueItem
class JobQueueItemInline(admin.StackedInline):
model = JobQueueItem
class JobQueueAdmin(admin.ModelAdmin):
model = JobQueue
list_display = ('name', 'label', 'total_items')
inlines = [JobQueueItemInline]
def total_items(self, obj):
return obj.items.all().count()
total_items.short_description = _(u'total items')
admin.site.register(JobQueue, JobQueueAdmin)

View File

@@ -1,2 +0,0 @@
def process_job(func, *args, **kwargs):
return func(*args, **kwargs)

View File

@@ -0,0 +1,14 @@
#class WorkerAlreadyDisabled(Exception):
# pass
#class WorkerAlreadyEnabled(Exception):
# pass
class JobQueuePushError(Exception):
pass
class JobQueueNoPendingJobs(Exception):
pass

View File

@@ -0,0 +1,18 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from navigation.api import Link
from clustering.permissions import PERMISSION_NODES_VIEW
from .permissions import PERMISSION_JOB_QUEUE_VIEW
node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW])
tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])

View File

@@ -0,0 +1,19 @@
from django.utils.translation import ugettext_lazy as _
WORKER_STATE_RUNNING = 'r'
WORKER_STATE_DEAD = 'd'
WORKER_STATE_CHOICES = (
(WORKER_STATE_RUNNING, _(u'running')),
(WORKER_STATE_DEAD, _(u'dead')),
)
JOB_STATE_PENDING = 'p'
JOB_STATE_PROCESSING = 'r'
JOB_STATE_ERROR = 'e'
JOB_STATE_CHOICES = (
(JOB_STATE_PENDING, _(u'pending')),
(JOB_STATE_PROCESSING, _(u'processing')),
(JOB_STATE_ERROR, _(u'error')),
)

View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'Node'
db.create_table('job_processor_node', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)),
('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)),
('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)),
('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)),
))
db.send_create_signal('job_processor', ['Node'])
# Adding model 'JobQueue'
db.create_table('job_processor_jobqueue', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('name', self.gf('django.db.models.fields.CharField')(unique=True, max_length=32)),
('unique_jobs', self.gf('django.db.models.fields.BooleanField')(default=True)),
))
db.send_create_signal('job_processor', ['JobQueue'])
# Adding model 'JobQueueItem'
db.create_table('job_processor_jobqueueitem', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('job_queue', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.JobQueue'])),
('creation_datetime', self.gf('django.db.models.fields.DateTimeField')()),
('unique_id', self.gf('django.db.models.fields.CharField')(unique=True, max_length=64, blank=True)),
('job_type', self.gf('django.db.models.fields.CharField')(max_length=32)),
('kwargs', self.gf('django.db.models.fields.TextField')()),
('state', self.gf('django.db.models.fields.CharField')(default='p', max_length=4)),
('result', self.gf('django.db.models.fields.TextField')(blank=True)),
))
db.send_create_signal('job_processor', ['JobQueueItem'])
# Adding model 'Worker'
db.create_table('job_processor_worker', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('node', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.Node'])),
('name', self.gf('django.db.models.fields.CharField')(max_length=255)),
('creation_datetime', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0))),
('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)),
('state', self.gf('django.db.models.fields.CharField')(default='r', max_length=4)),
))
db.send_create_signal('job_processor', ['Worker'])
def backwards(self, orm):
# Deleting model 'Node'
db.delete_table('job_processor_node')
# Deleting model 'JobQueue'
db.delete_table('job_processor_jobqueue')
# Deleting model 'JobQueueItem'
db.delete_table('job_processor_jobqueueitem')
# Deleting model 'Worker'
db.delete_table('job_processor_worker')
models = {
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.Node']"}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting model 'Node'
db.delete_table('job_processor_node')
# Changing field 'Worker.node'
db.alter_column('job_processor_worker', 'node_id', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['clustering.Node']))
def backwards(self, orm):
# Adding model 'Node'
db.create_table('job_processor_node', (
('memory_usage', self.gf('django.db.models.fields.FloatField')(blank=True)),
('hostname', self.gf('django.db.models.fields.CharField')(max_length=255)),
('cpuload', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, blank=True)),
('heartbeat', self.gf('django.db.models.fields.DateTimeField')(default=datetime.datetime(2012, 7, 30, 0, 0), blank=True)),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
))
db.send_create_signal('job_processor', ['Node'])
# Changing field 'Worker.node'
db.alter_column('job_processor_worker', 'node_id', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['job_processor.Node']))
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
},
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding field 'Worker.job_queue_item'
db.add_column('job_processor_worker', 'job_queue_item',
self.gf('django.db.models.fields.related.ForeignKey')(default=1, to=orm['job_processor.JobQueueItem']),
keep_default=False)
def backwards(self, orm):
# Deleting field 'Worker.job_queue_item'
db.delete_column('job_processor_worker', 'job_queue_item_id')
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
},
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting field 'Worker.name'
db.delete_column('job_processor_worker', 'name')
# Adding field 'Worker.pid'
db.add_column('job_processor_worker', 'pid',
self.gf('django.db.models.fields.PositiveIntegerField')(default=1, max_length=255),
keep_default=False)
def backwards(self, orm):
# User chose to not deal with backwards NULL issues for 'Worker.name'
raise RuntimeError("Cannot reverse this migration. 'Worker.name' and its values cannot be restored.")
# Deleting field 'Worker.pid'
db.delete_column('job_processor_worker', 'pid')
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'})
},
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -1,3 +1,241 @@
from django.db import models
from __future__ import absolute_import
# Create your models here.
import os
import datetime
import uuid
import hashlib
import platform
from multiprocessing import Process
import psutil
from django.db import models, IntegrityError, transaction
from django.db import close_connection
from django.contrib.contenttypes import generic
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ugettext
from django.utils.simplejson import loads, dumps
from common.models import Singleton
from clustering.models import Node
from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
WORKER_STATE_RUNNING)
from .exceptions import JobQueuePushError, JobQueueNoPendingJobs
#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled)
job_queue_labels = {}
job_types_registry = {}
class Job(object):
def __init__(self, function, job_queue_item):
close_connection()
# Run sync or launch async subprocess
# OR launch 2 processes: monitor & actual process
node = Node.objects.myself()
worker = Worker.objects.create(node=node, pid=os.getpid(), job_queue_item=job_queue_item)
try:
close_connection()
transaction.commit_on_success(function)(**loads(job_queue_item.kwargs))
#function(**loads(job_queue_item.kwargs))
except Exception, exc:
close_connection()
transaction.rollback()
close_connection()
def set_state_error():
job_queue_item.result = exc
job_queue_item.state = JOB_STATE_ERROR
job_queue_item.save()
transaction.commit_on_success(set_state_error)()
else:
job_queue_item.delete()
finally:
worker.delete()
class JobType(object):
def __init__(self, name, label, function):
self.name = name
self.label = label
self.function = function
job_types_registry[self.name] = self
def __unicode__(self):
return unicode(self.label)
def run(self, job_queue_item, **kwargs):
job_queue_item.state = JOB_STATE_PROCESSING
job_queue_item.save()
p = Process(target=Job, args=(self.function, job_queue_item,))
p.start()
class JobQueueManager(models.Manager):
def get_or_create(self, *args, **kwargs):
job_queue_labels[kwargs.get('name')] = kwargs.get('defaults', {}).get('label')
return super(JobQueueManager, self).get_or_create(*args, **kwargs)
class JobQueue(models.Model):
# TODO: support for stopping and starting job queues
# Internal name
name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True)
unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True)
objects = JobQueueManager()
def __unicode__(self):
return unicode(self.label) or self.names
@property
def label(self):
return job_queue_labels.get(self.name)
def push(self, job_type, **kwargs): # TODO: add replace flag
job_queue_item = JobQueueItem(job_queue=self, job_type=job_type.name, kwargs=dumps(kwargs))
job_queue_item.save()
return job_queue_item
#def pull(self):
# queue_item_qs = JobQueueItem.objects.filter(queue=self).order_by('-creation_datetime')
# if queue_item_qs:
# queue_item = queue_item_qs[0]
# queue_item.delete()
# return loads(queue_item.data)
def get_oldest_pending_job(self):
try:
return self.pending_jobs.all().order_by('-creation_datetime')[0]
except IndexError:
raise JobQueueNoPendingJobs
@property
def pending_jobs(self):
return self.items.filter(state=JOB_STATE_PENDING)
@property
def error_jobs(self):
return self.items.filter(state=JOB_STATE_ERROR)
@property
def active_jobs(self):
return self.items.filter(state=JOB_STATE_PROCESSING)
@property
def items(self):
return self.jobqueueitem_set
def empty(self):
self.items.all().delete()
def save(self, *args, **kwargs):
label = getattr(self, 'label', None)
if label:
job_queue_labels[self.name] = label
return super(JobQueue, self).save(*args, **kwargs)
# TODO: custom runtime methods
class Meta:
verbose_name = _(u'job queue')
verbose_name_plural = _(u'job queues')
class JobQueueItem(models.Model):
# TODO: add re-queue
job_queue = models.ForeignKey(JobQueue, verbose_name=_(u'job queue'))
creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), editable=False)
unique_id = models.CharField(blank=True, max_length=64, verbose_name=_(u'id'), unique=True, editable=False)
job_type = models.CharField(max_length=32, verbose_name=_(u'job type'))
kwargs = models.TextField(verbose_name=_(u'keyword arguments'))
state = models.CharField(max_length=4,
choices=JOB_STATE_CHOICES,
default=JOB_STATE_PENDING,
verbose_name=_(u'state'))
result = models.TextField(blank=True, verbose_name=_(u'result'))
def __unicode__(self):
return self.unique_id
def save(self, *args, **kwargs):
self.creation_datetime = datetime.datetime.now()
if self.job_queue.unique_jobs:
self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest()
else:
self.unique_id = unicode(uuid.uuid4())
try:
super(JobQueueItem, self).save(*args, **kwargs)
except IntegrityError:
# TODO: Maybe replace instead of rasining exception w/ replace flag
raise JobQueuePushError
def get_job_type(self):
return job_types_registry.get(self.job_type)
def run(self):
job_type_instance = self.get_job_type()
job_type_instance.run(self)
@property
def worker(self):
try:
return self.worker_set.get()
except Worker.DoesNotExist:
return None
class Meta:
ordering = ('creation_datetime',)
verbose_name = _(u'job queue item')
verbose_name_plural = _(u'job queue items')
class Worker(models.Model):
node = models.ForeignKey(Node, verbose_name=_(u'node'))
pid = models.PositiveIntegerField(max_length=255, verbose_name=_(u'name'))
creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False)
heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check'))
state = models.CharField(max_length=4,
choices=WORKER_STATE_CHOICES,
default=WORKER_STATE_RUNNING,
verbose_name=_(u'state'))
job_queue_item = models.ForeignKey(JobQueueItem, verbose_name=_(u'job queue item'))
def __unicode__(self):
return u'%s-%s' % (self.node.hostname, self.pid)
#def disable(self):
# if self.state == WORKER_STATE_DISABLED:
# raise WorkerAlreadyDisabled
#
# self.state = WORKER_STATE_DISABLED
# self.save()
#
#def enable(self):
# if self.state == WORKER_STATE_ENABLED:
# raise WorkerAlreadyEnabled
#
# self.state = WORKER_STATE_ENABLED
# self.save()
#
#def is_enabled(self):
# return self.state == WORKER_STATE_ENABLED
class Meta:
ordering = ('creation_datetime',)
verbose_name = _(u'worker')
verbose_name_plural = _(u'workers')
"""
class JobProcessingConfig(Singleton):
worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead
worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval')
def __unicode__(self):
return ugettext('Workers configuration')
class Meta:
verbose_name = verbose_name_plural = _(u'Workers configuration')
"""

View File

@@ -0,0 +1,8 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from permissions.models import PermissionNamespace, Permission
namespace = PermissionNamespace('job_processor', _(u'Job processor'))
PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster'))

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

View File

@@ -0,0 +1,43 @@
from __future__ import absolute_import
import logging
from lock_manager import Lock, LockError
from clustering.models import Node
from .models import JobQueue
from .exceptions import JobQueueNoPendingJobs
LOCK_EXPIRE = 10
MAX_CPU_LOAD = 90.0
MAX_MEMORY_USAGE = 90.0
logger = logging.getLogger(__name__)
def job_queue_poll():
logger.debug('starting')
node = Node.objects.myself()
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE:
# Poll job queues if node is not overloaded
lock_id = u'job_queue_poll'
try:
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
except LockError:
pass
except Exception:
lock.release()
raise
else:
for job_queue in JobQueue.objects.all():
try:
job_item = job_queue.get_oldest_pending_job()
job_item.run()
except JobQueueNoPendingJobs:
logger.debug('no pending jobs for job queue: %s' % job_queue)
lock.release()
else:
logger.debug('CPU load or memory usage over limit')

View File

@@ -0,0 +1,10 @@
from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('job_processor.views',
url(r'^node/(?P<node_pk>\d+)/workers/$', 'node_workers', (), 'node_workers'),
url(r'^queue/list/$', 'job_queues', (), 'job_queues'),
url(r'^queue/(?P<job_queue_pk>\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'),
url(r'^queue/(?P<job_queue_pk>\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'),
url(r'^queue/(?P<job_queue_pk>\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'),
)

View File

@@ -1 +1,153 @@
# Create your views here.
from __future__ import absolute_import
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy as _
from django.shortcuts import get_object_or_404
from django.contrib.contenttypes.models import ContentType
from django.db.models.loading import get_model
from django.http import Http404
from django.core.exceptions import PermissionDenied
from permissions.models import Permission
from common.utils import encapsulate
from acls.models import AccessEntry
from clustering.permissions import PERMISSION_NODES_VIEW
from clustering.models import Node
from .models import JobQueue
from .permissions import PERMISSION_JOB_QUEUE_VIEW
def node_workers(request, node_pk):
node = get_object_or_404(Node, pk=node_pk)
try:
Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW])
except PermissionDenied:
AccessEntry.objects.check_access(PERMISSION_NODES_VIEW, request.user, node)
context = {
'object_list': node.workers().all(),
'title': _(u'workers for node: %s') % node,
'object': node,
'hide_link': True,
'extra_columns': [
{
'name': _(u'created'),
'attribute': 'creation_datetime',
},
{
'name': _(u'heartbeat'),
'attribute': 'heartbeat',
},
{
'name': _(u'state'),
'attribute': 'get_state_display',
},
{
'name': _(u'job queue item'),
'attribute': 'job_queue_item',
},
{
'name': _(u'job type'),
'attribute': 'job_queue_item.get_job_type',
},
{
'name': _(u'job queue'),
'attribute': 'job_queue_item.job_queue',
},
],
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def job_queues(request):
# TODO: permissiong list filtering
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW])
context = {
'object_list': JobQueue.objects.all(),
'title': _(u'job queue'),
'hide_link': True,
'extra_columns': [
{
'name': _(u'pending jobs'),
'attribute': 'pending_jobs.count',
},
{
'name': _(u'active jobs'),
'attribute': 'active_jobs.count',
},
{
'name': _(u'error jobs'),
'attribute': 'error_jobs.count',
},
],
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False, active_jobs=False):
job_queue = get_object_or_404(JobQueue, pk=job_queue_pk)
try:
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW])
except PermissionDenied:
AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_VIEW, request.user, job_queue)
jobs = set()
if pending_jobs:
jobs = job_queue.pending_jobs.all()
title = _(u'pending jobs for queue: %s') % job_queue
if error_jobs:
jobs = job_queue.error_jobs.all()
title = _(u'error jobs for queue: %s') % job_queue
if active_jobs:
jobs = job_queue.active_jobs.all()
title = _(u'active jobs for queue: %s') % job_queue
context = {
'object_list': jobs,
'title': title,
'object': job_queue,
'hide_link': True,
'extra_columns': [
{
'name': _(u'created'),
'attribute': 'creation_datetime',
},
{
'name': _(u'job type'),
'attribute': 'get_job_type',
},
{
'name': _(u'arguments'),
'attribute': 'kwargs',
},
],
}
if active_jobs:
context['extra_columns'].append(
{
'name': _(u'worker'),
'attribute': encapsulate(lambda x: x.worker or _(u'Unknown')),
}
)
if error_jobs:
context['extra_columns'].append(
{
'name': _(u'result'),
'attribute': 'result',
}
)
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))

View File

@@ -1,6 +1,10 @@
from __future__ import absolute_import
import logging
from .exceptions import LockError
from .models import Lock as LockModel
logger = logging.getLogger(__name__)
Lock = LockModel.objects

View File

@@ -0,0 +1,33 @@
from __future__ import absolute_import
from functools import wraps
from . import logger
from . import Lock
from .exceptions import LockError
def simple_locking(lock_id, expiration=None):
"""
A decorator that wraps a function in a single lock getting algorithm
"""
def inner_decorator(function):
def wrapper(*args, **kwargs):
try:
# Trying to acquire lock
lock = Lock.acquire_lock(lock_id, expiration)
except LockError:
# Unable to acquire lock
pass
except Exception:
# Unhandled error, release lock
lock.release()
raise
else:
# Lock acquired, proceed normally, release lock afterwards
logger.debug('acquired lock: %s' % lock_id)
result = function(*args, **kwargs)
lock.release()
return result
return wraps(function)(wrapper)
return inner_decorator

View File

@@ -31,7 +31,7 @@ class LockManager(models.Manager):
except self.model.DoesNotExist:
# Table based locking
logger.debug('lock: %s does not exist' % name)
raise LockError('Unable to acquire lock')
raise LockError('unable to acquire lock: %s' % name)
if datetime.datetime.now() > lock.creation_datetime + datetime.timedelta(seconds=lock.timeout):
logger.debug('reseting deleting stale lock: %s' % name)

View File

@@ -2,7 +2,8 @@ from __future__ import absolute_import
import datetime
from django.db import models
from django.db import close_connection
from django.db import (models, transaction, DatabaseError)
from django.utils.translation import ugettext_lazy as _
from .managers import LockManager
@@ -26,13 +27,17 @@ class Lock(models.Model):
super(Lock, self).save(*args, **kwargs)
@transaction.commit_on_success
def release(self):
close_connection()
try:
lock = Lock.objects.get(name=self.name, creation_datetime=self.creation_datetime)
lock.delete()
except Lock.DoesNotExist:
# Out lock expired and was reassigned
# Lock expired and was reassigned
pass
except DatabaseError:
transaction.rollback()
class Meta:
verbose_name = _(u'lock')

View File

@@ -158,7 +158,11 @@ def bind_links(sources, links, menu_name=None, position=0):
bound_links.setdefault(menu_name, {})
for source in sources:
bound_links[menu_name].setdefault(source, {'links': []})
bound_links[menu_name][source]['links'].extend(links)
try:
bound_links[menu_name][source]['links'].extend(links)
except TypeError:
# Try to see if links is a single link
bound_links[menu_name][source]['links'].append(links)
def register_top_menu(name, link, position=None):

View File

@@ -15,49 +15,42 @@ from documents.models import Document, DocumentVersion
from maintenance.api import register_maintenance_links
from project_tools.api import register_tool
from acls.api import class_permissions
from scheduler.api import register_interval_job
from statistics.api import register_statistics
from job_processor.models import JobQueue, JobType
from job_processor.exceptions import JobQueuePushError
from .conf.settings import (AUTOMATIC_OCR, QUEUE_PROCESSING_INTERVAL)
from .models import DocumentQueue, QueueTransformation
from .tasks import task_process_document_queues
from .models import OCRProcessingSingleton
from .api import do_document_ocr
from .permissions import PERMISSION_OCR_DOCUMENT
from .exceptions import AlreadyQueued
from . import models as ocr_models
from .statistics import get_statistics
from .literals import OCR_QUEUE_NAME
logger = logging.getLogger(__name__)
ocr_job_queue = None
from .links import (submit_document, re_queue_multiple_document,
queue_document_multiple_delete, document_queue_disable,
document_queue_enable, all_document_ocr_cleanup, queue_document_list,
ocr_tool_link, setup_queue_transformation_list,
setup_queue_transformation_create, setup_queue_transformation_edit,
setup_queue_transformation_delete, submit_document_multiple)
queue_document_multiple_delete, ocr_disable,
ocr_enable, all_document_ocr_cleanup, ocr_log,
ocr_tool_link, submit_document_multiple)
bind_links([Document], [submit_document])
bind_links([DocumentQueue], [document_queue_disable, document_queue_enable, setup_queue_transformation_list])
bind_links([QueueTransformation], [setup_queue_transformation_edit, setup_queue_transformation_delete])
register_multi_item_links(['queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete])
bind_links(['setup_queue_transformation_create', 'setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'document_queue_disable', 'document_queue_enable', 'queue_document_list', 'setup_queue_transformation_list'], [queue_document_list], menu_name='secondary_menu')
bind_links(['setup_queue_transformation_edit', 'setup_queue_transformation_delete', 'setup_queue_transformation_list', 'setup_queue_transformation_create'], [setup_queue_transformation_create], menu_name='sidebar')
bind_links([OCRProcessingSingleton], [ocr_disable, ocr_enable])
#register_multi_item_links(['queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete])
register_maintenance_links([all_document_ocr_cleanup], namespace='ocr', title=_(u'OCR'))
register_multi_item_links(['folder_view', 'search', 'results', 'index_instance_node_view', 'document_find_duplicates', 'document_type_document_list', 'document_group_view', 'document_list', 'document_list_recent'], [submit_document_multiple])
@transaction.commit_on_success
def create_default_queue():
def create_ocr_job_queue():
global ocr_job_queue
try:
default_queue, created = DocumentQueue.objects.get_or_create(name='default')
ocr_job_queue, created = JobQueue.objects.get_or_create(name=OCR_QUEUE_NAME, defaults={'label': _('OCR'), 'unique_jobs': True})
except DatabaseError:
transaction.rollback()
else:
if created:
default_queue.label = ugettext(u'Default')
default_queue.save()
@receiver(post_save, dispatch_uid='document_post_save', sender=DocumentVersion)
@@ -67,8 +60,8 @@ def document_post_save(sender, instance, **kwargs):
if kwargs.get('created', False):
if AUTOMATIC_OCR:
try:
DocumentQueue.objects.queue_document(instance.document)
except AlreadyQueued:
instance.submit_for_ocr()
except JobQueuePushError:
pass
# Disabled because it appears Django execute signals using the same
@@ -80,17 +73,15 @@ def document_post_save(sender, instance, **kwargs):
# logger.debug('got call_queue signal: %s' % kwargs)
# task_process_document_queues()
@receiver(post_syncdb, dispatch_uid='create_default_queue', sender=ocr_models)
def create_default_queue_signal_handler(sender, **kwargs):
create_default_queue()
register_interval_job('task_process_document_queues', _(u'Checks the OCR queue for pending documents.'), task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL)
register_tool(ocr_tool_link)
class_permissions(Document, [
PERMISSION_OCR_DOCUMENT,
])
register_statistics(get_statistics)
#register_statistics(get_statistics)
create_ocr_job_queue()
ocr_job_type = JobType('ocr', _(u'OCR'), do_document_ocr)
Document.add_to_class('submit_for_ocr', lambda document: ocr_job_queue.push(ocr_job_type, document_version_pk=document.latest_version.pk))
DocumentVersion.add_to_class('submit_for_ocr', lambda document_version: ocr_job_queue.push(ocr_job_type, document_version_pk=document_version.pk))

View File

@@ -1,20 +0,0 @@
from __future__ import absolute_import
from django.contrib import admin
from .models import DocumentQueue, QueueDocument
class QueueDocumentInline(admin.StackedInline):
model = QueueDocument
extra = 1
classes = ('collapse-open',)
allow_add = True
class DocumentQueueAdmin(admin.ModelAdmin):
inlines = [QueueDocumentInline]
list_display = ('name', 'label', 'state')
admin.site.register(DocumentQueue, DocumentQueueAdmin)

View File

@@ -12,7 +12,7 @@ from django.utils.importlib import import_module
from common.conf.settings import TEMPORARY_DIRECTORY
from converter.api import convert
from documents.models import DocumentPage
from documents.models import DocumentPage, DocumentVersion
from .conf.settings import (TESSERACT_PATH, TESSERACT_LANGUAGE, UNPAPER_PATH)
from .exceptions import TesseractError, UnpaperError
@@ -81,25 +81,25 @@ def run_tesseract(input_filename, lang=None):
return text
def do_document_ocr(queue_document):
def do_document_ocr(document_version_pk):
"""
Try first to extract text from document pages using the registered
parser, if the parser fails or if there is no parser registered for
the document mimetype do a visual OCR by calling tesseract
"""
for document_page in queue_document.document.pages.all():
document_version = DocumentVersion.objects.get(pk=document_version_pk)
for document_page in document_version.pages.all():
try:
# Try to extract text by means of a parser
parse_document_page(document_page)
except (ParserError, ParserUnknownFile):
# Fall back to doing visual OCR
ocr_transformations, warnings = queue_document.get_transformation_list()
document_filepath = document_page.document.get_image_cache_name(page=document_page.page_number, version=document_page.document_version.pk)
unpaper_output_filename = u'%s_unpaper_out_page_%s%s%s' % (document_page.document.uuid, document_page.page_number, os.extsep, UNPAPER_FILE_FORMAT)
unpaper_output_filepath = os.path.join(TEMPORARY_DIRECTORY, unpaper_output_filename)
unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT, transformations=ocr_transformations)
unpaper_input = convert(document_filepath, file_format=UNPAPER_FILE_FORMAT)
execute_unpaper(input_filepath=unpaper_input, output_filepath=unpaper_output_filepath)
#from PIL import Image, ImageOps

View File

@@ -21,3 +21,11 @@ class UnpaperError(Exception):
class ReQueueError(Exception):
pass
class OCRProcessingAlreadyDisabled(Exception):
pass
class OCRProcessingAlreadyEnabled(Exception):
pass

View File

@@ -1,21 +0,0 @@
from __future__ import absolute_import
from django import forms
from .models import QueueTransformation
class QueueTransformationForm(forms.ModelForm):
class Meta:
model = QueueTransformation
def __init__(self, *args, **kwargs):
super(QueueTransformationForm, self).__init__(*args, **kwargs)
self.fields['content_type'].widget = forms.HiddenInput()
self.fields['object_id'].widget = forms.HiddenInput()
class QueueTransformationForm_create(forms.ModelForm):
class Meta:
model = QueueTransformation
exclude = ('content_type', 'object_id')

View File

@@ -7,23 +7,26 @@ from navigation.api import Link
from .permissions import (PERMISSION_OCR_DOCUMENT,
PERMISSION_OCR_DOCUMENT_DELETE, PERMISSION_OCR_QUEUE_ENABLE_DISABLE,
PERMISSION_OCR_CLEAN_ALL_PAGES)
from .models import OCRProcessingSingleton
submit_document = Link(text=_('submit to OCR queue'), view='submit_document', args='object.id', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT])
submit_document_multiple = Link(text=_('submit to OCR queue'), view='submit_document_multiple', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT])
def is_enabled(context):
return OCRProcessingSingleton.get().is_enabled()
def is_disabled(context):
return not OCRProcessingSingleton.get().is_enabled()
ocr_log = Link(text=_(u'queue document list'), view='ocr_log', sprite='text', permissions=[PERMISSION_OCR_DOCUMENT])
ocr_disable = Link(text=_(u'disable OCR processing'), view='ocr_disable', sprite='control_stop_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE], conditional_disable=is_disabled)
ocr_enable = Link(text=_(u'enable OCR processing'), view='ocr_enable', sprite='control_play_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE], conditional_disable=is_enabled)
submit_document = Link(text=_('submit to OCR queue'), view='submit_document', args='object.id', sprite='text_dropcaps', permissions=[PERMISSION_OCR_DOCUMENT])
submit_document_multiple = Link(text=_('submit to OCR queue'), view='submit_document_multiple', sprite='text_dropcaps', permissions=[PERMISSION_OCR_DOCUMENT])
re_queue_document = Link(text=_('re-queue'), view='re_queue_document', args='object.id', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT])
re_queue_multiple_document = Link(text=_('re-queue'), view='re_queue_multiple_document', sprite='hourglass_add', permissions=[PERMISSION_OCR_DOCUMENT])
queue_document_delete = Link(text=_(u'delete'), view='queue_document_delete', args='object.id', sprite='hourglass_delete', permissions=[PERMISSION_OCR_DOCUMENT_DELETE])
queue_document_multiple_delete = Link(text=_(u'delete'), view='queue_document_multiple_delete', sprite='hourglass_delete', permissions=[PERMISSION_OCR_DOCUMENT_DELETE])
document_queue_disable = Link(text=_(u'stop queue'), view='document_queue_disable', args='queue.id', sprite='control_stop_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
document_queue_enable = Link(text=_(u'activate queue'), view='document_queue_enable', args='queue.id', sprite='control_play_blue', permissions=[PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
all_document_ocr_cleanup = Link(text=_(u'clean up pages content'), view='all_document_ocr_cleanup', sprite='text_strikethrough', permissions=[PERMISSION_OCR_CLEAN_ALL_PAGES], description=_(u'Runs a language filter to remove common OCR mistakes from document pages content.'))
queue_document_list = Link(text=_(u'queue document list'), view='queue_document_list', sprite='hourglass', permissions=[PERMISSION_OCR_DOCUMENT])
ocr_tool_link = Link(text=_(u'OCR'), view='queue_document_list', sprite='hourglass', icon='text.png', permissions=[PERMISSION_OCR_DOCUMENT], children_view_regex=[r'queue_', r'document_queue'])
setup_queue_transformation_list = Link(text=_(u'transformations'), view='setup_queue_transformation_list', args='queue.pk', sprite='shape_move_front')
setup_queue_transformation_create = Link(text=_(u'add transformation'), view='setup_queue_transformation_create', args='queue.pk', sprite='shape_square_add')
setup_queue_transformation_edit = Link(text=_(u'edit'), view='setup_queue_transformation_edit', args='transformation.pk', sprite='shape_square_edit')
setup_queue_transformation_delete = Link(text=_(u'delete'), view='setup_queue_transformation_delete', args='transformation.pk', sprite='shape_square_delete')
ocr_tool_link = Link(text=_(u'OCR'), view='ocr_log', sprite='hourglass', icon='text.png', permissions=[PERMISSION_OCR_DOCUMENT]) # children_view_regex=[r'queue_', r'document_queue'])

View File

@@ -1,25 +1,16 @@
from django.utils.translation import ugettext_lazy as _
DOCUMENTQUEUE_STATE_STOPPED = 's'
DOCUMENTQUEUE_STATE_ACTIVE = 'a'
OCR_STATE_DISABLED = 'd'
OCR_STATE_ENABLED = 'e'
DOCUMENTQUEUE_STATE_CHOICES = (
(DOCUMENTQUEUE_STATE_STOPPED, _(u'stopped')),
(DOCUMENTQUEUE_STATE_ACTIVE, _(u'active')),
)
QUEUEDOCUMENT_STATE_PENDING = 'p'
QUEUEDOCUMENT_STATE_PROCESSING = 'i'
QUEUEDOCUMENT_STATE_ERROR = 'e'
QUEUEDOCUMENT_STATE_CHOICES = (
(QUEUEDOCUMENT_STATE_PENDING, _(u'pending')),
(QUEUEDOCUMENT_STATE_PROCESSING, _(u'processing')),
(QUEUEDOCUMENT_STATE_ERROR, _(u'error')),
OCR_STATE_CHOICES = (
(OCR_STATE_DISABLED, _(u'disabled')),
(OCR_STATE_ENABLED, _(u'enabled')),
)
DEFAULT_OCR_FILE_FORMAT = u'tiff'
DEFAULT_OCR_FILE_EXTENSION = u'tif'
UNPAPER_FILE_FORMAT = u'ppm'
OCR_QUEUE_NAME = 'ocr'

View File

@@ -2,19 +2,19 @@ from __future__ import absolute_import
from django.db import models
from .exceptions import AlreadyQueued
#from .exceptions import AlreadyQueued
class DocumentQueueManager(models.Manager):
'''
Module manager class to handle adding documents to an OCR document
queue
'''
def queue_document(self, document, queue_name='default'):
document_queue = self.model.objects.get(name=queue_name)
if document_queue.queuedocument_set.filter(document=document):
raise AlreadyQueued
class OCRProcessingManager(models.Manager):
"""
Module manager class to handle adding documents to an OCR queue
"""
def queue_document(self, document):
pass
#document_queue = self.model.objects.get(name=queue_name)
#if document_queue.queuedocument_set.filter(document_version=document.latest_version):
# raise AlreadyQueued
document_queue.queuedocument_set.create(document=document, delay=True)
#document_queue.queuedocument_set.create(document_version=document.latest_version, delay=True)
return document_queue
#return document_queue

View File

@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting model 'DocumentQueue'
db.delete_table('ocr_documentqueue')
# Deleting model 'QueueDocument'
db.delete_table('ocr_queuedocument')
# Deleting model 'QueueTransformation'
db.delete_table('ocr_queuetransformation')
# Adding model 'OCRProcessingSingleton'
db.create_table('ocr_ocrprocessingsingleton', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)),
('state', self.gf('django.db.models.fields.CharField')(default='a', max_length=4)),
))
db.send_create_signal('ocr', ['OCRProcessingSingleton'])
def backwards(self, orm):
# Adding model 'DocumentQueue'
db.create_table('ocr_documentqueue', (
('state', self.gf('django.db.models.fields.CharField')(default='a', max_length=4)),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('name', self.gf('django.db.models.fields.CharField')(max_length=64, unique=True)),
('label', self.gf('django.db.models.fields.CharField')(max_length=64)),
))
db.send_create_signal('ocr', ['DocumentQueue'])
# Adding model 'QueueDocument'
db.create_table('ocr_queuedocument', (
('delay', self.gf('django.db.models.fields.BooleanField')(default=False)),
('state', self.gf('django.db.models.fields.CharField')(default='p', max_length=4)),
('result', self.gf('django.db.models.fields.TextField')(null=True, blank=True)),
('datetime_submitted', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True, db_index=True)),
('document_queue', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['ocr.DocumentQueue'])),
('document_version', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['documents.DocumentVersion'])),
('document', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['documents.Document'])),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('node_name', self.gf('django.db.models.fields.CharField')(max_length=32, null=True, blank=True)),
))
db.send_create_signal('ocr', ['QueueDocument'])
# Adding model 'QueueTransformation'
db.create_table('ocr_queuetransformation', (
('object_id', self.gf('django.db.models.fields.PositiveIntegerField')()),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('arguments', self.gf('django.db.models.fields.TextField')(null=True, blank=True)),
('content_type', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['contenttypes.ContentType'])),
('order', self.gf('django.db.models.fields.PositiveIntegerField')(default=0, null=True, blank=True, db_index=True)),
('transformation', self.gf('django.db.models.fields.CharField')(max_length=128)),
))
db.send_create_signal('ocr', ['QueueTransformation'])
# Deleting model 'OCRProcessingSingleton'
db.delete_table('ocr_ocrprocessingsingleton')
models = {
'ocr.ocrprocessingsingleton': {
'Meta': {'object_name': 'OCRProcessingSingleton'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'a'", 'max_length': '4'})
}
}
complete_apps = ['ocr']

View File

@@ -1,7 +1,7 @@
from __future__ import absolute_import
from ast import literal_eval
from datetime import datetime
import datetime
from django.db import models
from django.utils.translation import ugettext_lazy as _
@@ -11,113 +11,45 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes import generic
from django.core.exceptions import ValidationError
from documents.models import Document
from common.models import Singleton
from documents.models import Document, DocumentVersion
from converter.api import get_available_transformations_choices
from sources.managers import SourceTransformationManager
from .literals import (DOCUMENTQUEUE_STATE_CHOICES,
QUEUEDOCUMENT_STATE_PENDING, QUEUEDOCUMENT_STATE_CHOICES,
QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE)
from .managers import DocumentQueueManager
from .exceptions import ReQueueError
from .literals import (OCR_STATE_CHOICES, OCR_STATE_ENABLED,
OCR_STATE_DISABLED)
from .managers import OCRProcessingManager
from .exceptions import (ReQueueError, OCRProcessingAlreadyDisabled,
OCRProcessingAlreadyEnabled)
class DocumentQueue(models.Model):
name = models.CharField(max_length=64, unique=True, verbose_name=_(u'name'))
label = models.CharField(max_length=64, verbose_name=_(u'label'))
class OCRProcessingSingleton(Singleton):
state = models.CharField(max_length=4,
choices=DOCUMENTQUEUE_STATE_CHOICES,
default=DOCUMENTQUEUE_STATE_ACTIVE,
choices=OCR_STATE_CHOICES,
default=OCR_STATE_ENABLED,
verbose_name=_(u'state'))
objects = DocumentQueueManager()
class Meta:
verbose_name = _(u'document queue')
verbose_name_plural = _(u'document queues')
#objects = AnonymousUserSingletonManager()
def __unicode__(self):
return self.label
return ugettext('OCR processing')
def disable(self):
if self.state == OCR_STATE_DISABLED:
raise OCRProcessingAlreadyDisabled
self.state = OCR_STATE_DISABLED
self.save()
class QueueDocument(models.Model):
document_queue = models.ForeignKey(DocumentQueue, verbose_name=_(u'document queue'))
document = models.ForeignKey(Document, verbose_name=_(u'document'))
datetime_submitted = models.DateTimeField(verbose_name=_(u'date time submitted'), auto_now_add=True, db_index=True)
delay = models.BooleanField(verbose_name=_(u'delay ocr'), default=False)
state = models.CharField(max_length=4,
choices=QUEUEDOCUMENT_STATE_CHOICES,
default=QUEUEDOCUMENT_STATE_PENDING,
verbose_name=_(u'state'))
result = models.TextField(blank=True, null=True, verbose_name=_(u'result'))
node_name = models.CharField(max_length=32, verbose_name=_(u'node name'), blank=True, null=True)
def enable(self):
if self.state == OCR_STATE_ENABLED:
raise OCRProcessingAlreadyEnabled
self.state = OCR_STATE_ENABLED
self.save()
def is_enabled(self):
return self.state == OCR_STATE_ENABLED
class Meta:
ordering = ('datetime_submitted',)
verbose_name = _(u'queue document')
verbose_name_plural = _(u'queue documents')
def get_transformation_list(self):
return QueueTransformation.transformations.get_for_object_as_list(self)
def requeue(self):
if self.state == QUEUEDOCUMENT_STATE_PROCESSING:
raise ReQueueError
else:
self.datetime_submitted = datetime.now()
self.state = QUEUEDOCUMENT_STATE_PENDING
self.delay = False
self.result = None
self.node_name = None
self.save()
def __unicode__(self):
try:
return unicode(self.document)
except ObjectDoesNotExist:
return ugettext(u'Missing document.')
class ArgumentsValidator(object):
message = _(u'Enter a valid value.')
code = 'invalid'
def __init__(self, message=None, code=None):
if message is not None:
self.message = message
if code is not None:
self.code = code
def __call__(self, value):
'''
Validates that the input evaluates correctly.
'''
value = value.strip()
try:
literal_eval(value)
except (ValueError, SyntaxError):
raise ValidationError(self.message, code=self.code)
class QueueTransformation(models.Model):
'''
Model that stores the transformation and transformation arguments
for a given document queue
'''
content_type = models.ForeignKey(ContentType)
object_id = models.PositiveIntegerField()
content_object = generic.GenericForeignKey('content_type', 'object_id')
order = models.PositiveIntegerField(default=0, blank=True, null=True, verbose_name=_(u'order'), db_index=True)
transformation = models.CharField(choices=get_available_transformations_choices(), max_length=128, verbose_name=_(u'transformation'))
arguments = models.TextField(blank=True, null=True, verbose_name=_(u'arguments'), help_text=_(u'Use dictionaries to indentify arguments, example: %s') % u'{\'degrees\':90}', validators=[ArgumentsValidator()])
objects = models.Manager()
transformations = SourceTransformationManager()
def __unicode__(self):
return self.get_transformation_display()
class Meta:
ordering = ('order',)
verbose_name = _(u'document queue transformation')
verbose_name_plural = _(u'document queue transformations')
verbose_name = verbose_name_plural = _(u'OCR processing properties')

View File

@@ -7,6 +7,6 @@ from permissions.models import Permission, PermissionNamespace
ocr_namespace = PermissionNamespace('ocr', _(u'OCR'))
PERMISSION_OCR_DOCUMENT = Permission.objects.register(ocr_namespace, 'ocr_document', _(u'Submit documents for OCR'))
PERMISSION_OCR_DOCUMENT_DELETE = Permission.objects.register(ocr_namespace, 'ocr_document_delete', _(u'Delete documents from OCR queue'))
PERMISSION_OCR_QUEUE_ENABLE_DISABLE = Permission.objects.register(ocr_namespace, 'ocr_queue_enable_disable', _(u'Can enable/disable the OCR queue'))
PERMISSION_OCR_QUEUE_ENABLE_DISABLE = Permission.objects.register(ocr_namespace, 'ocr_queue_enable_disable', _(u'Can enable/disable the OCR processing'))
PERMISSION_OCR_CLEAN_ALL_PAGES = Permission.objects.register(ocr_namespace, 'ocr_clean_all_pages', _(u'Can execute the OCR clean up on all document pages'))
PERMISSION_OCR_QUEUE_EDIT = Permission.objects.register(ocr_namespace, 'ocr_queue_edit', _(u'Can edit an OCR queue properties'))

View File

@@ -2,7 +2,7 @@ from __future__ import absolute_import
from django.utils.translation import ugettext as _
from .models import DocumentQueue, QueueDocument
#from .models import DocumentQueue, QueueDocument
def get_statistics():

View File

@@ -1,75 +0,0 @@
from __future__ import absolute_import
from datetime import timedelta, datetime
import platform
import logging
from django.db.models import Q
from job_processor.api import process_job
from lock_manager import Lock, LockError
from .api import do_document_ocr
from .literals import (QUEUEDOCUMENT_STATE_PENDING,
QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE,
QUEUEDOCUMENT_STATE_ERROR)
from .models import QueueDocument, DocumentQueue
from .conf.settings import NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
# TODO: Tie LOCK_EXPIRATION with hard task timeout
logger = logging.getLogger(__name__)
def task_process_queue_document(queue_document_id):
lock_id = u'task_proc_queue_doc-%d' % queue_document_id
try:
logger.debug('trying to acquire lock: %s' % lock_id)
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
logger.debug('acquired lock: %s' % lock_id)
queue_document = QueueDocument.objects.get(pk=queue_document_id)
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.node_name = platform.node()
queue_document.save()
try:
do_document_ocr(queue_document)
queue_document.delete()
except Exception, e:
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e
queue_document.save()
lock.release()
except LockError:
logger.debug('unable to obtain lock')
pass
def task_process_document_queues():
logger.debug('executed')
# TODO: reset_orphans()
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
q_delayed = Q(delay=True)
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
current_local_processing_count = QueueDocument.objects.filter(
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
node_name=platform.node()).count()
if current_local_processing_count < NODE_CONCURRENT_EXECUTION:
try:
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
process_job(task_process_queue_document, oldest_queued_document.pk)
except Exception, e:
logger.error('unhandled exception: %s' % e)
finally:
# Don't process anymore from this queryset, might be stale
break
else:
logger.debug('already processing maximun')
else:
logger.debug('nothing to process')

View File

@@ -1,21 +1,17 @@
from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('ocr.views',
url(r'^log/$', 'ocr_log', (), 'ocr_log'),
url(r'^processing/enable/$', 'ocr_enable', (), 'ocr_enable'),
url(r'^processing/disable/$', 'ocr_disable', (), 'ocr_disable'),
url(r'^document/(?P<document_id>\d+)/submit/$', 'submit_document', (), 'submit_document'),
url(r'^document/multiple/submit/$', 'submit_document_multiple', (), 'submit_document_multiple'),
url(r'^queue/document/list/$', 'queue_document_list', (), 'queue_document_list'),
url(r'^queue/document/(?P<queue_document_id>\d+)/delete/$', 'queue_document_delete', (), 'queue_document_delete'),
url(r'^queue/document/multiple/delete/$', 'queue_document_multiple_delete', (), 'queue_document_multiple_delete'),
url(r'^queue/document/(?P<queue_document_id>\d+)/re-queue/$', 're_queue_document', (), 're_queue_document'),
url(r'^queue/document/multiple/re-queue/$', 're_queue_multiple_document', (), 're_queue_multiple_document'),
url(r'^queue/(?P<document_queue_id>\d+)/enable/$', 'document_queue_enable', (), 'document_queue_enable'),
url(r'^queue/(?P<document_queue_id>\d+)/disable/$', 'document_queue_disable', (), 'document_queue_disable'),
#url(r'^queue/document/(?P<queue_document_id>\d+)/delete/$', 'queue_document_delete', (), 'queue_document_delete'),
#url(r'^queue/document/multiple/delete/$', 'queue_document_multiple_delete', (), 'queue_document_multiple_delete'),
#url(r'^queue/document/(?P<queue_document_id>\d+)/re-queue/$', 're_queue_document', (), 're_queue_document'),
#url(r'^queue/document/multiple/re-queue/$', 're_queue_multiple_document', (), 're_queue_multiple_document'),
url(r'^document/all/clean_up/$', 'all_document_ocr_cleanup', (), 'all_document_ocr_cleanup'),
url(r'^queue/(?P<document_queue_id>\d+)/transformation/list/$', 'setup_queue_transformation_list', (), 'setup_queue_transformation_list'),
url(r'^queue/(?P<document_queue_id>\w+)/transformation/create/$', 'setup_queue_transformation_create', (), 'setup_queue_transformation_create'),
url(r'^queue/transformation/(?P<transformation_id>\w+)/edit/$', 'setup_queue_transformation_edit', (), 'setup_queue_transformation_edit'),
url(r'^queue/transformation/(?P<transformation_id>\w+)/delete/$', 'setup_queue_transformation_delete', (), 'setup_queue_transformation_delete'),
)

View File

@@ -14,56 +14,114 @@ from documents.models import Document
from documents.widgets import document_link, document_thumbnail
from common.utils import encapsulate
from acls.models import AccessEntry
from job_processor.exceptions import JobQueuePushError
from .permissions import (PERMISSION_OCR_DOCUMENT,
PERMISSION_OCR_DOCUMENT_DELETE, PERMISSION_OCR_QUEUE_ENABLE_DISABLE,
PERMISSION_OCR_CLEAN_ALL_PAGES, PERMISSION_OCR_QUEUE_EDIT)
from .models import DocumentQueue, QueueDocument, QueueTransformation
from .literals import (QUEUEDOCUMENT_STATE_PROCESSING,
DOCUMENTQUEUE_STATE_ACTIVE, DOCUMENTQUEUE_STATE_STOPPED)
from .exceptions import AlreadyQueued, ReQueueError
from .models import OCRProcessingSingleton
from .exceptions import (AlreadyQueued, ReQueueError, OCRProcessingAlreadyDisabled,
OCRProcessingAlreadyEnabled)
from .api import clean_pages
from .forms import QueueTransformationForm, QueueTransformationForm_create
from . import ocr_job_queue, ocr_job_type
def queue_document_list(request, queue_name='default'):
def ocr_log(request):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_DOCUMENT])
document_queue = get_object_or_404(DocumentQueue, name=queue_name)
return object_list(
request,
queryset=document_queue.queuedocument_set.all(),
template_name='generic_list.html',
extra_context={
'title': _(u'documents in queue: %s') % document_queue,
'hide_object': True,
'queue': document_queue,
'object_name': _(u'document queue'),
'navigation_object_name': 'queue',
'list_object_variable_name': 'queue_document',
'extra_columns': [
{'name': 'document', 'attribute': encapsulate(lambda x: document_link(x.document) if hasattr(x, 'document') else _(u'Missing document.'))},
{'name': _(u'thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document))},
{'name': 'submitted', 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True},
{'name': 'delay', 'attribute': 'delay'},
{'name': 'state', 'attribute': encapsulate(lambda x: x.get_state_display())},
{'name': 'node', 'attribute': 'node_name'},
{'name': 'result', 'attribute': 'result'},
],
'multi_select_as_buttons': True,
'sidebar_subtemplates_list': [
{
'name': 'generic_subtemplate.html',
'context': {
'side_bar': True,
'title': _(u'document queue properties'),
'content': _(u'Current state: %s') % document_queue.get_state_display(),
}
context = {
'queue': OCRProcessingSingleton.get(),
'object_name': _(u'OCR processing'), # TODO fix, not working
'navigation_object_name': 'queue',
'object_list': [],
'title': _(u'OCR log items'),
#'hide_object': True,
#'hide_link': True,
'extra_columns': [
{'name': _(u'document'), 'attribute': encapsulate(lambda x: document_link(x.document_version.document) if hasattr(x, 'document_version') else _(u'Missing document.'))},
{'name': _(u'version'), 'attribute': 'document_version'},
{'name': _(u'thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document_version.document))},
{'name': _('submitted'), 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True},
#{'name': _('delay'), 'attribute': 'delay'},
#{'name': _('state'), 'attribute': encapsulate(lambda x: x.get_state_display())},
#{'name': _('node'), 'attribute': 'node_name'},
{'name': _('result'), 'attribute': 'result'},
],
'multi_select_as_buttons': True,
'sidebar_subtemplates_list': [
{
'name': 'generic_subtemplate.html',
'context': {
'side_bar': True,
'title': _(u'OCR processing properties'),
'content': _(u'Current state: %s') % OCRProcessingSingleton.get().get_state_display(),
}
]
},
)
}
]
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
# 'queue': document_queue,
# 'object_name': _(u'document queue'),
# 'navigation_object_name': 'queue',
# 'list_object_variable_name': 'queue_document',
# },
#)
def ocr_disable(request):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
if request.method == 'POST':
try:
OCRProcessingSingleton.get().disable()
except OCRProcessingAlreadyDisabled:
messages.warning(request, _(u'OCR processing already disabled.'))
return HttpResponseRedirect(previous)
else:
messages.success(request, _(u'OCR processing disabled successfully.'))
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'queue': OCRProcessingSingleton.get(),
'navigation_object_name': 'queue',
'title': _(u'Are you sure you wish to disable OCR processing?'),
'next': next,
'previous': previous,
'form_icon': u'control_stop_blue.png',
}, context_instance=RequestContext(request))
def ocr_enable(request):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
if request.method == 'POST':
try:
OCRProcessingSingleton.get().enable()
except OCRProcessingAlreadyDisabled:
messages.warning(request, _(u'OCR processing already enabled.'))
return HttpResponseRedirect(previous)
else:
messages.success(request, _(u'OCR processing enabled successfully.'))
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'queue': OCRProcessingSingleton.get(),
'navigation_object_name': 'queue',
'title': _(u'Are you sure you wish to enable OCR processing?'),
'next': next,
'previous': previous,
'form_icon': u'control_play_blue.png',
}, context_instance=RequestContext(request))
def queue_document_delete(request, queue_document_id=None, queue_document_id_list=None):
@@ -136,15 +194,16 @@ def submit_document(request, document_id):
def submit_document_to_queue(request, document, post_submit_redirect=None):
'''
"""
This view is meant to be reusable
'''
"""
try:
document_queue = DocumentQueue.objects.queue_document(document)
messages.success(request, _(u'Document: %(document)s was added to the OCR queue: %(queue)s.') % {
'document': document, 'queue': document_queue.label})
except AlreadyQueued:
document.submit_for_ocr()
#ocr_job_queue.push(ocr_job_type, document_version_pk=document.latest_version.pk)
messages.success(request, _(u'Document: %(document)s was added to the OCR queue sucessfully.') % {
'document': document})
except JobQueuePushError:
messages.warning(request, _(u'Document: %(document)s is already queued.') % {
'document': document})
except Exception, e:
@@ -175,12 +234,12 @@ def re_queue_document(request, queue_document_id=None, queue_document_id_list=No
messages.success(
request,
_(u'Document: %(document)s was re-queued to the OCR queue: %(queue)s') % {
'document': queue_document.document,
'document': queue_document.document_version.document,
'queue': queue_document.document_queue.label
}
)
except Document.DoesNotExist:
messages.error(request, _(u'Document id#: %d, no longer exists.') % queue_document.document_id)
messages.error(request, _(u'Document no longer in queue.'))
except ReQueueError:
messages.warning(
request,
@@ -208,60 +267,6 @@ def re_queue_multiple_document(request):
return re_queue_document(request, queue_document_id_list=request.GET.get('id_list', []))
def document_queue_disable(request, document_queue_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
if document_queue.state == DOCUMENTQUEUE_STATE_STOPPED:
messages.warning(request, _(u'Document queue: %s, already stopped.') % document_queue)
return HttpResponseRedirect(previous)
if request.method == 'POST':
document_queue.state = DOCUMENTQUEUE_STATE_STOPPED
document_queue.save()
messages.success(request, _(u'Document queue: %s, stopped successfully.') % document_queue)
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'queue': document_queue,
'navigation_object_name': 'queue',
'title': _(u'Are you sure you wish to disable document queue: %s') % document_queue,
'next': next,
'previous': previous,
'form_icon': u'control_stop_blue.png',
}, context_instance=RequestContext(request))
def document_queue_enable(request, document_queue_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
if document_queue.state == DOCUMENTQUEUE_STATE_ACTIVE:
messages.warning(request, _(u'Document queue: %s, already active.') % document_queue)
return HttpResponseRedirect(previous)
if request.method == 'POST':
document_queue.state = DOCUMENTQUEUE_STATE_ACTIVE
document_queue.save()
messages.success(request, _(u'Document queue: %s, activated successfully.') % document_queue)
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'queue': document_queue,
'navigation_object_name': 'queue',
'title': _(u'Are you sure you wish to activate document queue: %s') % document_queue,
'next': next,
'previous': previous,
'form_icon': u'control_play_blue.png',
}, context_instance=RequestContext(request))
def all_document_ocr_cleanup(request):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_CLEAN_ALL_PAGES])
@@ -297,126 +302,3 @@ def display_link(obj):
return u''.join(output)
else:
return obj
# Setup views
def setup_queue_transformation_list(request, document_queue_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT])
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
context = {
'object_list': QueueTransformation.transformations.get_for_object(document_queue),
'title': _(u'transformations for: %s') % document_queue,
'queue': document_queue,
'object_name': _(u'document queue'),
'navigation_object_name': 'queue',
'list_object_variable_name': 'transformation',
'extra_columns': [
{'name': _(u'order'), 'attribute': 'order'},
{'name': _(u'transformation'), 'attribute': encapsulate(lambda x: x.get_transformation_display())},
{'name': _(u'arguments'), 'attribute': 'arguments'}
],
'hide_link': True,
'hide_object': True,
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def setup_queue_transformation_edit(request, transformation_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT])
transformation = get_object_or_404(QueueTransformation, pk=transformation_id)
redirect_view = reverse('setup_queue_transformation_list', args=[transformation.content_object.pk])
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', redirect_view)))
if request.method == 'POST':
form = QueueTransformationForm(instance=transformation, data=request.POST)
if form.is_valid():
try:
form.save()
messages.success(request, _(u'Queue transformation edited successfully'))
return HttpResponseRedirect(next)
except Exception, e:
messages.error(request, _(u'Error editing queue transformation; %s') % e)
else:
form = QueueTransformationForm(instance=transformation)
return render_to_response('generic_form.html', {
'title': _(u'Edit transformation: %s') % transformation,
'form': form,
'queue': transformation.content_object,
'transformation': transformation,
'navigation_object_list': [
{'object': 'queue', 'name': _(u'document queue')},
{'object': 'transformation', 'name': _(u'transformation')}
],
'next': next,
},
context_instance=RequestContext(request))
def setup_queue_transformation_delete(request, transformation_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT])
transformation = get_object_or_404(QueueTransformation, pk=transformation_id)
redirect_view = reverse('setup_queue_transformation_list', args=[transformation.content_object.pk])
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', redirect_view)))
if request.method == 'POST':
try:
transformation.delete()
messages.success(request, _(u'Queue transformation deleted successfully.'))
except Exception, e:
messages.error(request, _(u'Error deleting queue transformation; %(error)s') % {
'error': e}
)
return HttpResponseRedirect(redirect_view)
return render_to_response('generic_confirm.html', {
'delete_view': True,
'transformation': transformation,
'queue': transformation.content_object,
'navigation_object_list': [
{'object': 'queue', 'name': _(u'document queue')},
{'object': 'transformation', 'name': _(u'transformation')}
],
'title': _(u'Are you sure you wish to delete queue transformation "%(transformation)s"') % {
'transformation': transformation.get_transformation_display(),
},
'previous': previous,
'form_icon': u'shape_square_delete.png',
},
context_instance=RequestContext(request))
def setup_queue_transformation_create(request, document_queue_id):
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_EDIT])
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
redirect_view = reverse('setup_queue_transformation_list', args=[document_queue.pk])
if request.method == 'POST':
form = QueueTransformationForm_create(request.POST)
if form.is_valid():
try:
queue_tranformation = form.save(commit=False)
queue_tranformation.content_object = document_queue
queue_tranformation.save()
messages.success(request, _(u'Queue transformation created successfully'))
return HttpResponseRedirect(redirect_view)
except Exception, e:
messages.error(request, _(u'Error creating queue transformation; %s') % e)
else:
form = QueueTransformationForm_create()
return render_to_response('generic_form.html', {
'form': form,
'queue': document_queue,
'object_name': _(u'document queue'),
'navigation_object_name': 'queue',
'title': _(u'Create new transformation for queue: %s') % document_queue,
}, context_instance=RequestContext(request))

View File

@@ -2,43 +2,31 @@ from __future__ import absolute_import
import logging
import atexit
import sys
from .runtime import scheduler
from django.db.models.signals import post_syncdb
from django.dispatch import receiver
from south.signals import pre_migrate
from signaler.signals import pre_collectstatic
from project_tools.api import register_tool
from navigation.api import bind_links
from .links import scheduler_tool_link, scheduler_list, job_list
from .literals import SHUTDOWN_COMMANDS
from .api import LocalScheduler
from .links import job_list
logger = logging.getLogger(__name__)
@receiver(post_syncdb, dispatch_uid='scheduler_shutdown_post_syncdb')
def scheduler_shutdown_post_syncdb(sender, **kwargs):
logger.debug('Scheduler shut down on post syncdb signal')
scheduler.shutdown()
@receiver(pre_collectstatic, dispatch_uid='sheduler_shutdown_pre_collectstatic')
def sheduler_shutdown_pre_collectstatic(sender, **kwargs):
logger.debug('Scheduler shut down on collectstatic signal')
scheduler.shutdown()
@receiver(pre_migrate, dispatch_uid='sheduler_shutdown_pre_migrate')
def sheduler_shutdown_pre_migrate(sender, **kwargs):
logger.debug('Scheduler shut down on pre_migrate signal')
scheduler.shutdown()
def schedule_shutdown_on_exit():
logger.debug('Scheduler shut down on exit')
scheduler.shutdown()
logger.debug('Schedulers shut down on exit')
LocalScheduler.shutdown_all()
register_tool(job_list)
if any([command in sys.argv for command in SHUTDOWN_COMMANDS]):
logger.debug('Schedulers shut down on SHUTDOWN_COMMAND')
# Shutdown any scheduler already running
LocalScheduler.shutdown_all()
# Prevent any new scheduler afterwards to start
LocalScheduler.lockdown()
register_tool(scheduler_tool_link)
atexit.register(schedule_shutdown_on_exit)
bind_links([LocalScheduler, 'scheduler_list', 'job_list'], scheduler_list, menu_name='secondary_menu')
bind_links([LocalScheduler], job_list)

View File

@@ -1,30 +1,147 @@
from __future__ import absolute_import
from .runtime import scheduler
from .exceptions import AlreadyScheduled
import logging
registered_jobs = {}
from apscheduler.scheduler import Scheduler as OriginalScheduler
from django.utils.translation import ugettext_lazy as _
from .exceptions import AlreadyScheduled, UnknownJobClass
logger = logging.getLogger(__name__)
def register_interval_job(name, title, func, weeks=0, days=0, hours=0, minutes=0,
seconds=0, start_date=None, args=None,
kwargs=None, job_name=None, **options):
class SchedulerJobBase(object):
job_type = u''
if name in registered_jobs:
raise AlreadyScheduled
def __init__(self, name, label, function, *args, **kwargs):
self.scheduler = None
self.name = name
self.label = label
self.function = function
self.args = args
self.kwargs = kwargs
job = scheduler.add_interval_job(func=func, weeks=weeks, days=days,
hours=hours, minutes=minutes, seconds=seconds,
start_date=start_date, args=args, kwargs=kwargs, **options)
def stop(self):
self.scheduler.stop_job(self)
registered_jobs[name] = {'title': title, 'job': job}
@property
def running(self):
if self.scheduler:
return self.scheduler.running
else:
return False
@property
def start_date(self):
return self._job.trigger.start_date
def remove_job(name):
if name in registered_jobs:
scheduler.unschedule_job(registered_jobs[name]['job'])
registered_jobs.pop(name)
class IntervalJob(SchedulerJobBase):
job_type = _(u'Interval job')
def start(self, scheduler):
scheduler.add_job(self)
def get_job_list():
return registered_jobs.values()
class DateJob(SchedulerJobBase):
job_type = _(u'Date job')
def start(self, scheduler):
scheduler.add_job(self)
class LocalScheduler(object):
scheduler_registry = {}
lockdown = False
@classmethod
def get(cls, name):
return cls.scheduler_registry[name]
@classmethod
def get_all(cls):
return cls.scheduler_registry.values()
@classmethod
def shutdown_all(cls):
for scheduler in cls.scheduler_registry.values():
scheduler.stop()
@classmethod
def lockdown(cls):
cls.lockdown = True
def __init__(self, name, label=None):
self.scheduled_jobs = {}
self._scheduler = None
self.name = name
self.label = label
self.__class__.scheduler_registry[self.name] = self
def start(self):
logger.debug('starting scheduler: %s' % self.name)
if not self.__class__.lockdown:
self._scheduler = OriginalScheduler()
for job in self.scheduled_jobs.values():
self._schedule_job(job)
self._scheduler.start()
else:
logger.debug('lockdown in effect')
def stop(self):
if self._scheduler:
self._scheduler.shutdown()
del self._scheduler
self._scheduler = None
@property
def running(self):
if self._scheduler:
return self._scheduler.running
else:
return False
def clear(self):
for job in self.scheduled_jobs.values():
self.stop_job(job)
def stop_job(self, job):
self._scheduler.unschedule_job(job._job)
del(self.scheduled_jobs[job.name])
job.scheduler = None
def _schedule_job(self, job):
if isinstance(job, IntervalJob):
job._job = self._scheduler.add_interval_job(job.function, *job.args, **job.kwargs)
elif isinstance(job, DateJob):
job._job = self._scheduler.add_date_job(job.function, *job.args, **job.kwargs)
else:
raise UnknownJobClass
def add_job(self, job):
if job.scheduler or job.name in self.scheduled_jobs.keys():
raise AlreadyScheduled
if self._scheduler:
self._scheduler_job(job)
job.scheduler = self
self.scheduled_jobs[job.name] = job
def add_interval_job(self, name, label, function, *args, **kwargs):
job = IntervalJob(name=name, label=label, function=function, *args, **kwargs)
self.add_job(job)
return job
def add_date_job(self, name, label, function, *args, **kwargs):
job = DateJob(name=name, label=label, function=function, *args, **kwargs)
self.add_job(job)
return job
def get_job_list(self):
return self.scheduled_jobs.values()
def __unicode__(self):
return unicode(self.label or self.name)

View File

@@ -1,2 +1,14 @@
class AlreadyScheduled(Exception):
"""
Raised when trying to schedule a Job instance of anything after it was
already scheduled in any other scheduler
"""
pass
class UnknownJobClass(Exception):
"""
Raised when trying to schedule a Job that is not of a a type:
IntervalJob or DateJob
"""
pass

View File

@@ -4,6 +4,8 @@ from django.utils.translation import ugettext_lazy as _
from navigation.api import Link
from .permissions import PERMISSION_VIEW_JOB_LIST
from .permissions import PERMISSION_VIEW_JOB_LIST, PERMISSION_VIEW_SCHEDULER_LIST
job_list = Link(text=_(u'interval job list'), view='job_list', icon='time.png', permissions=[PERMISSION_VIEW_JOB_LIST])
scheduler_tool_link = Link(text=_(u'local schedulers'), view='scheduler_list', icon='time.png', permissions=[PERMISSION_VIEW_SCHEDULER_LIST])
scheduler_list = Link(text=_(u'scheduler list'), view='scheduler_list', sprite='time', permissions=[PERMISSION_VIEW_SCHEDULER_LIST])
job_list = Link(text=_(u'interval job list'), view='job_list', args='object.name', sprite='timeline_marker', permissions=[PERMISSION_VIEW_JOB_LIST])

View File

@@ -0,0 +1 @@
SHUTDOWN_COMMANDS = ['syncdb', 'migrate', 'schemamigration', 'datamigration', 'collectstatic', 'shell', 'shell_plus']

View File

@@ -5,4 +5,5 @@ from django.utils.translation import ugettext_lazy as _
from permissions.models import PermissionNamespace, Permission
namespace = PermissionNamespace('scheduler', _(u'Scheduler'))
PERMISSION_VIEW_JOB_LIST = Permission.objects.register(namespace, 'jobs_list', _(u'View the interval job list'))
PERMISSION_VIEW_SCHEDULER_LIST = Permission.objects.register(namespace, 'schedulers_list', _(u'View the local scheduler list'))
PERMISSION_VIEW_JOB_LIST = Permission.objects.register(namespace, 'jobs_list', _(u'View the local scheduler job list'))

View File

@@ -1,4 +0,0 @@
from apscheduler.scheduler import Scheduler
scheduler = Scheduler()
scheduler.start()

View File

@@ -1,5 +1,6 @@
from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('scheduler.views',
url(r'^list/$', 'job_list', (), 'job_list'),
url(r'^scheduler/list/$', 'scheduler_list', (), 'scheduler_list'),
url(r'^scheduler/(?P<scheduler_name>\w+)/job/list/$', 'job_list', (), 'job_list'),
)

View File

@@ -3,32 +3,67 @@ from __future__ import absolute_import
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy as _
from django.http import Http404
from permissions.models import Permission
from common.utils import encapsulate
from .permissions import PERMISSION_VIEW_JOB_LIST
from .api import get_job_list
from .permissions import PERMISSION_VIEW_SCHEDULER_LIST, PERMISSION_VIEW_JOB_LIST
from .api import LocalScheduler
def job_list(request):
Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_JOB_LIST])
def scheduler_list(request):
Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_SCHEDULER_LIST])
context = {
'object_list': get_job_list(),
'title': _(u'interval jobs'),
'object_list': LocalScheduler.get_all(),
'title': _(u'local schedulers'),
'extra_columns': [
{
'name': _(u'name'),
'attribute': 'name'
},
{
'name': _(u'label'),
'attribute': encapsulate(lambda job: job['title'])
'attribute': 'label'
},
{
'name': _(u'running'),
'attribute': 'running'
},
],
'hide_object': True,
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def job_list(request, scheduler_name):
Permission.objects.check_permissions(request.user, [PERMISSION_VIEW_JOB_LIST])
try:
scheduler = LocalScheduler.get(scheduler_name)
except:
raise Http404
context = {
'object_list': scheduler.get_job_list(),
'title': _(u'local jobs in scheduler: %s') % scheduler,
'extra_columns': [
{
'name': _(u'name'),
'attribute': 'name'
},
{
'name': _(u'label'),
'attribute': 'label'
},
{
'name': _(u'start date time'),
'attribute': encapsulate(lambda job: job['job'].trigger.start_date)
'attribute': 'start_date'
},
{
'name': _(u'interval'),
'attribute': encapsulate(lambda job: job['job'].trigger.interval)
'name': _(u'type'),
'attribute': 'job_type'
},
],
'hide_object': True,

View File

@@ -6,7 +6,7 @@ from navigation.api import (bind_links,
register_model_list_columns)
from common.utils import encapsulate
from project_setup.api import register_setup
from scheduler.api import register_interval_job
from scheduler.api import LocalScheduler
from documents.models import Document
from .staging import StagingFile
@@ -62,8 +62,10 @@ register_model_list_columns(StagingFile, [
register_setup(setup_sources)
register_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL)
register_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL)
sources_scheduler = LocalScheduler('sources', _(u'Document sources'))
sources_scheduler.add_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL)
sources_scheduler.add_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL)
sources_scheduler.start()
bind_links(['document_list_recent', 'document_list', 'document_create', 'document_create_multiple', 'upload_interactive', 'staging_file_delete'], [document_create_multiple], menu_name='secondary_menu')
bind_links([Document], [document_create_multiple], menu_name='secondary_menu')

View File

@@ -27,7 +27,6 @@ from converter.literals import DIMENSION_SEPARATOR
from documents.models import Document, DocumentType
from documents.events import history_document_created
from metadata.api import save_metadata_list
from scheduler.api import register_interval_job, remove_job
from acls.utils import apply_default_acls
from .managers import SourceTransformationManager, SourceLogManager
@@ -43,6 +42,7 @@ from .literals import (SOURCE_CHOICES, SOURCE_CHOICES_PLURAL,
IMAP_DEFAULT_MAILBOX)
from .compressed_file import CompressedFile, NotACompressedFile
from .conf.settings import POP3_TIMEOUT
#from . import sources_scheduler
logger = logging.getLogger(__name__)
@@ -441,17 +441,18 @@ class WatchFolder(BaseModel):
interval = models.PositiveIntegerField(verbose_name=_(u'interval'), help_text=_(u'Inverval in seconds where the watch folder path is checked for new documents.'))
def save(self, *args, **kwargs):
if self.pk:
remove_job(self.internal_name())
#if self.pk:
# remove_job(self.internal_name())
super(WatchFolder, self).save(*args, **kwargs)
self.schedule()
def schedule(self):
if self.enabled:
register_interval_job(self.internal_name(),
title=self.fullname(), func=self.execute,
kwargs={'source_id': self.pk}, seconds=self.interval
)
pass
#if self.enabled:
# sources_scheduler.add_interval_job(self.internal_name(),
# title=self.fullname(), function=self.execute,
# seconds=self.interval, kwargs={'source_id': self.pk}
# )
def execute(self, source_id):
source = WatchFolder.objects.get(pk=source_id)

View File

@@ -73,10 +73,13 @@ Afterwards migrate existing database schema with::
$ ./manage.py migrate metadata 0001 --fake
$ ./manage.py migrate acls 0001 --fake
$ ./manage.py migrate ocr 0001 --fake
$ ./manage.py migrate ocr
$ ./manage.py migrate history 0001 --fake
$ ./manage.py migrate tags 0001 --fake
$ ./manage.py migrate linking 0001 --fake
$ ./manage.py migrate lock_manager 0001 --fake
$ ./manage.py migrate job_processor
$ ./manage.py migrate clustering
Issue the following command to index existing documents in the new full text search database::

View File

@@ -0,0 +1,10 @@
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.%(django_database_driver)s',
'NAME': '%(database_name)s',
'USER': '%(database_username)s',
'PASSWORD': '%(database_password)s',
'HOST': '%(database_host)s',
'PORT': '',
}
}

View File

@@ -162,6 +162,7 @@ INSTALLED_APPS = (
'converter',
'user_management',
'mimetype',
'clustering',
'scheduler',
'job_processor',
# Mayan EDMS
@@ -187,7 +188,7 @@ INSTALLED_APPS = (
'workflows',
'checkouts',
'rest_api',
'bootstrap',
#'bootstrap',
'statistics',
# Has to be last so the other apps can register it's signals

View File

@@ -36,10 +36,12 @@ urlpatterns = patterns('',
(r'^checkouts/', include('checkouts.urls')),
(r'^installation/', include('installation.urls')),
(r'^scheduler/', include('scheduler.urls')),
(r'^job_processing/', include('job_processor.urls')),
(r'^bootstrap/', include('bootstrap.urls')),
(r'^diagnostics/', include('diagnostics.urls')),
(r'^maintenance/', include('maintenance.urls')),
(r'^statistics/', include('statistics.urls')),
(r'^clustering/', include('clustering.urls')),
)