Connect the worker with the job queue item that spawned it

This commit is contained in:
Roberto Rosario
2012-07-30 16:23:38 -04:00
parent d339a250fe
commit e0fbac66d1
3 changed files with 95 additions and 2 deletions

View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding field 'Worker.job_queue_item'
db.add_column('job_processor_worker', 'job_queue_item',
self.gf('django.db.models.fields.related.ForeignKey')(default=1, to=orm['job_processor.JobQueueItem']),
keep_default=False)
def backwards(self, orm):
# Deleting field 'Worker.job_queue_item'
db.delete_column('job_processor_worker', 'job_queue_item_id')
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.PositiveIntegerField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'blank': 'True'})
},
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -35,7 +35,7 @@ class Job(object):
# Run sync or launch async subprocess
# OR launch 2 processes: monitor & actual process
node = Node.objects.myself()
worker = Worker.objects.create(node=node, name=os.getpid())
worker = Worker.objects.create(node=node, name=os.getpid(), job_queue_item=job_queue_item)
try:
close_connection()
transaction.commit_on_success(function)(**loads(job_queue_item.kwargs))
@@ -180,6 +180,7 @@ class Worker(models.Model):
choices=WORKER_STATE_CHOICES,
default=WORKER_STATE_RUNNING,
verbose_name=_(u'state'))
job_queue_item = models.ForeignKey(JobQueueItem, verbose_name=_(u'job queue item'))
def __unicode__(self):
return u'%s-%s' % (self.node.hostname, self.name)

View File

@@ -28,8 +28,40 @@ def node_workers(request, node_pk):
'object_list': node.workers().all(),
'title': _(u'workers for node: %s') % node,
'object': node,
#'hide_object': True,
'hide_links': True,
'extra_columns': [
{
'name': _(u'created'),
'attribute': 'creation_datetime',
},
{
'name': _(u'heartbeat'),
'attribute': 'heartbeat',
},
{
'name': _(u'state'),
'attribute': 'get_state_display',
},
{
'name': _(u'job queue item'),
'attribute': 'job_queue_item',
},
{
'name': _(u'job type'),
'attribute': 'job_queue_item.job_type',
},
],
}
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
node = models.ForeignKey(Node, verbose_name=_(u'node'))
name = models.CharField(max_length=255, verbose_name=_(u'name'))
creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False)
heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check'))
stat#e = models.CharField(max_length=4,
#choices=WORKER_STATE_CHOICES,
#default=WORKER_STATE_RUNNING,
#verbose_name=_(u'state'))