From e0fbac66d113f1a27c0305bb50d3d6008e91d301 Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Mon, 30 Jul 2012 16:23:38 -0400 Subject: [PATCH] Connect the worker with the job queue item that spawned it --- ...3_auto__add_field_worker_job_queue_item.py | 60 +++++++++++++++++++ apps/job_processor/models.py | 3 +- apps/job_processor/views.py | 34 ++++++++++- 3 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py diff --git a/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py b/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py new file mode 100644 index 0000000000..31469b6b5b --- /dev/null +++ b/apps/job_processor/migrations/0003_auto__add_field_worker_job_queue_item.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'Worker.job_queue_item' + db.add_column('job_processor_worker', 'job_queue_item', + self.gf('django.db.models.fields.related.ForeignKey')(default=1, to=orm['job_processor.JobQueueItem']), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'Worker.job_queue_item' + db.delete_column('job_processor_worker', 'job_queue_item_id') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 6fb7b52962..72a9142c51 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -35,7 +35,7 @@ class Job(object): # Run sync or launch async subprocess # OR launch 2 processes: monitor & actual process node = Node.objects.myself() - worker = Worker.objects.create(node=node, name=os.getpid()) + worker = Worker.objects.create(node=node, name=os.getpid(), job_queue_item=job_queue_item) try: close_connection() transaction.commit_on_success(function)(**loads(job_queue_item.kwargs)) @@ -180,6 +180,7 @@ class Worker(models.Model): choices=WORKER_STATE_CHOICES, default=WORKER_STATE_RUNNING, verbose_name=_(u'state')) + job_queue_item = models.ForeignKey(JobQueueItem, verbose_name=_(u'job queue item')) def __unicode__(self): return u'%s-%s' % (self.node.hostname, self.name) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index fa8a2112fe..b1b5461c0b 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -28,8 +28,40 @@ def node_workers(request, node_pk): 'object_list': node.workers().all(), 'title': _(u'workers for node: %s') % node, 'object': node, - #'hide_object': True, + 'hide_links': True, + 'extra_columns': [ + { + 'name': _(u'created'), + 'attribute': 'creation_datetime', + }, + { + 'name': _(u'heartbeat'), + 'attribute': 'heartbeat', + }, + { + 'name': _(u'state'), + 'attribute': 'get_state_display', + }, + { + 'name': _(u'job queue item'), + 'attribute': 'job_queue_item', + }, + { + 'name': _(u'job type'), + 'attribute': 'job_queue_item.job_type', + }, + ], } return render_to_response('generic_list.html', context, context_instance=RequestContext(request)) + + + node = models.ForeignKey(Node, verbose_name=_(u'node')) + name = models.CharField(max_length=255, verbose_name=_(u'name')) + creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False) + heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check')) + stat#e = models.CharField(max_length=4, + #choices=WORKER_STATE_CHOICES, + #default=WORKER_STATE_RUNNING, + #verbose_name=_(u'state'))