diff --git a/apps/checkouts/__init__.py b/apps/checkouts/__init__.py index f176cfa01a..1991c69cf1 100644 --- a/apps/checkouts/__init__.py +++ b/apps/checkouts/__init__.py @@ -3,7 +3,7 @@ from __future__ import absolute_import from django.utils.translation import ugettext_lazy as _ from navigation.api import bind_links, register_top_menu -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from documents.models import Document from acls.api import class_permissions @@ -14,6 +14,7 @@ from .permissions import (PERMISSION_DOCUMENT_CHECKOUT, from .links import checkout_list, checkout_document, checkout_info, checkin_document from .models import DocumentCheckout from .tasks import task_check_expired_check_outs +from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL def initialize_document_checkout_extra_methods(): @@ -34,6 +35,8 @@ class_permissions(Document, [ PERMISSION_DOCUMENT_RESTRICTIONS_OVERRIDE ]) -CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed -register_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL) +checkouts_scheduler = LocalScheduler('checkouts', _(u'Document checkouts')) +checkouts_scheduler.add_interval_job('task_check_expired_check_outs', _(u'Check expired check out documents and checks them in.'), task_check_expired_check_outs, seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL) +checkouts_scheduler.start() + initialize_document_checkout_extra_methods() diff --git a/apps/checkouts/literals.py b/apps/checkouts/literals.py index 23e9920984..22ac3c1279 100644 --- a/apps/checkouts/literals.py +++ b/apps/checkouts/literals.py @@ -14,3 +14,5 @@ STATE_LABELS = { STATE_CHECKED_OUT: _(u'checked out'), STATE_CHECKED_IN: _(u'checked in/available'), } + +CHECK_EXPIRED_CHECK_OUTS_INTERVAL = 60 # Lowest check out expiration allowed diff --git a/apps/clustering/__init__.py b/apps/clustering/__init__.py index f5b453f6d2..66a8549c26 100644 --- a/apps/clustering/__init__.py +++ b/apps/clustering/__init__.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from django.utils.translation import ugettext_lazy as _ -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from navigation.api import bind_links from project_tools.api import register_tool @@ -10,8 +10,10 @@ from .tasks import node_heartbeat, house_keeping from .links import tool_link, node_list from .models import Node, ClusteringConfig -register_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval) -register_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1) +clustering_scheduler = LocalScheduler('clustering', _(u'Clustering')) +clustering_scheduler.add_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval) +clustering_scheduler.add_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1) +clustering_scheduler.start() register_tool(tool_link) bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') diff --git a/apps/dynamic_search/__init__.py b/apps/dynamic_search/__init__.py index 55718876be..ae5422c437 100644 --- a/apps/dynamic_search/__init__.py +++ b/apps/dynamic_search/__init__.py @@ -9,9 +9,8 @@ from django.core.management import call_command from navigation.api import register_sidebar_template, bind_links, Link from documents.models import Document -from scheduler.runtime import scheduler +from scheduler.api import LocalScheduler from signaler.signals import post_update_index, pre_update_index -from scheduler.api import register_interval_job from lock_manager import Lock, LockError from .models import IndexableObject @@ -36,7 +35,7 @@ def scheduler_shutdown_pre_update_index(sender, mayan_runtime, **kwargs): # Only shutdown the scheduler if the command is called from the command # line if not mayan_runtime: - scheduler.shutdown() + LocalScheduler.shutdown_all() def search_index_update(): @@ -61,4 +60,6 @@ def search_index_update(): bind_links(['search', 'search_advanced', 'results'], [search], menu_name='form_header') bind_links(['results'], [search_again], menu_name='sidebar') -register_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL) +dynamic_search_scheduler = LocalScheduler('search', _(u'Search')) +dynamic_search_scheduler.add_interval_job('search_index_update', _(u'Update the search index with the most recent modified documents.'), search_index_update, seconds=INDEX_UPDATE_INTERVAL) +dynamic_search_scheduler.start() diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index b66ada46cf..66fb010bff 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from django.utils.translation import ugettext_lazy as _ -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from navigation.api import bind_links, register_model_list_columns from project_tools.api import register_tool from common.utils import encapsulate @@ -14,9 +14,12 @@ from .tasks import job_queue_poll from .links import (node_workers, job_queues, tool_link, job_queue_items_pending, job_queue_items_error, job_queue_items_active) +#TODO: fix this, make it cluster wide JOB_QUEUE_POLL_INTERVAL = 1 -register_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) +job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) +job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) +job_processor_scheduler.start() register_tool(tool_link) bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') diff --git a/apps/ocr/__init__.py b/apps/ocr/__init__.py index 4b04ad41f7..b5447d8fa7 100644 --- a/apps/ocr/__init__.py +++ b/apps/ocr/__init__.py @@ -15,7 +15,6 @@ from documents.models import Document, DocumentVersion from maintenance.api import register_maintenance_links from project_tools.api import register_tool from acls.api import class_permissions -from scheduler.api import register_interval_job from statistics.api import register_statistics from job_processor.models import JobQueue, JobType from job_processor.exceptions import JobQueuePushError diff --git a/apps/sources/__init__.py b/apps/sources/__init__.py index 4daa0ccd04..6ed77c392b 100644 --- a/apps/sources/__init__.py +++ b/apps/sources/__init__.py @@ -6,7 +6,7 @@ from navigation.api import (bind_links, register_model_list_columns) from common.utils import encapsulate from project_setup.api import register_setup -from scheduler.api import register_interval_job +from scheduler.api import LocalScheduler from documents.models import Document from .staging import StagingFile @@ -62,8 +62,10 @@ register_model_list_columns(StagingFile, [ register_setup(setup_sources) -register_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL) -register_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler = LocalScheduler('sources', _(u'Document sources')) +sources_scheduler.add_interval_job('task_fetch_pop3_emails', _(u'Connects to the POP3 email sources and fetches the attached documents.'), task_fetch_pop3_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler.add_interval_job('task_fetch_imap_emails', _(u'Connects to the IMAP email sources and fetches the attached documents.'), task_fetch_imap_emails, seconds=EMAIL_PROCESSING_INTERVAL) +sources_scheduler.start() bind_links(['document_list_recent', 'document_list', 'document_create', 'document_create_multiple', 'upload_interactive', 'staging_file_delete'], [document_create_multiple], menu_name='secondary_menu') bind_links([Document], [document_create_multiple], menu_name='secondary_menu') diff --git a/apps/sources/models.py b/apps/sources/models.py index 31b89b66f0..9c779eebe7 100644 --- a/apps/sources/models.py +++ b/apps/sources/models.py @@ -27,7 +27,6 @@ from converter.literals import DIMENSION_SEPARATOR from documents.models import Document, DocumentType from documents.events import history_document_created from metadata.api import save_metadata_list -from scheduler.api import register_interval_job, remove_job from acls.utils import apply_default_acls from .managers import SourceTransformationManager, SourceLogManager @@ -43,6 +42,7 @@ from .literals import (SOURCE_CHOICES, SOURCE_CHOICES_PLURAL, IMAP_DEFAULT_MAILBOX) from .compressed_file import CompressedFile, NotACompressedFile from .conf.settings import POP3_TIMEOUT +#from . import sources_scheduler logger = logging.getLogger(__name__) @@ -441,17 +441,18 @@ class WatchFolder(BaseModel): interval = models.PositiveIntegerField(verbose_name=_(u'interval'), help_text=_(u'Inverval in seconds where the watch folder path is checked for new documents.')) def save(self, *args, **kwargs): - if self.pk: - remove_job(self.internal_name()) + #if self.pk: + # remove_job(self.internal_name()) super(WatchFolder, self).save(*args, **kwargs) self.schedule() def schedule(self): - if self.enabled: - register_interval_job(self.internal_name(), - title=self.fullname(), func=self.execute, - kwargs={'source_id': self.pk}, seconds=self.interval - ) + pass + #if self.enabled: + # sources_scheduler.add_interval_job(self.internal_name(), + # title=self.fullname(), function=self.execute, + # seconds=self.interval, kwargs={'source_id': self.pk} + # ) def execute(self, source_id): source = WatchFolder.objects.get(pk=source_id)