Skip to content
This repository was archived by the owner on Feb 22, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 41 additions & 22 deletions pymjq/jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,33 @@

class JobQueue:

def __init__(self, db, silent=False):
# Capped collection documents can not have its size updated
# https://docs.mongodb.com/manual/core/capped-collections/#document-size
DONE = 'done'.ljust(10, '_')
WAITING = 'waiting'.ljust(10, '_')
WORKING = 'working'.ljust(10, '_')

def __init__(self, db, silent=False, iterator_wait=None):
""" Return an instance of a JobQueue.
Initialization requires one argument, the database,
since we use one jobqueue collection to cover all
sites in an installation/database. The second
sites in an installation/database. The second
argument specifies if to print status while waiting
for new job, the default value is False"""
self.db = db
self.silent=silent
if not self._exists():
print ('Creating jobqueue collection.')
self._create()
self.q = self.db['jobqueue']
self.iterator_wait = iterator_wait
if self.iterator_wait is None:
def deafult_iterator_wait():
time.sleep(5)
if not silent:
print ('waiting!')
return True

self.iterator_wait = deafult_iterator_wait

def _create(self, capped=True):
""" Creates a Capped Collection. """
Expand All @@ -29,6 +43,11 @@ def _create(self, capped=True):
except:
raise Exception('Collection "jobqueue" already created')

def _find_opts(self):
if hasattr(pymongo, 'CursorType'):
return {'cursor_type': pymongo.CursorType.TAILABLE_AWAIT} # pylint: disable=no-member
return {'Tailable': True}

def _exists(self):
""" Ensures that the jobqueue collection exists in the DB. """
return 'jobqueue' in self.db.collection_names()
Expand All @@ -42,11 +61,11 @@ def valid(self):

def next(self):
""" Runs the next job in the queue. """
cursor = self.q.find({'status': 'waiting'},
tailable=True)
cursor = self.q.find({'status': self.WAITING},
**self._find_opts())
if cursor:
row = cursor.next()
row['status'] = 'done'
row['status'] = self.DONE
row['ts']['started'] = datetime.now()
row['ts']['done'] = datetime.now()
self.q.save(row)
Expand All @@ -61,7 +80,7 @@ def pub(self, data=None):
ts={'created': datetime.now(),
'started': datetime.now(),
'done': datetime.now()},
status='waiting',
status=self.WAITING,
data=data)
try:
self.q.insert(doc, manipulate=False)
Expand All @@ -72,35 +91,35 @@ def pub(self, data=None):
def __iter__(self):
""" Iterates through all docs in the queue
andw aits for new jobs when queue is empty. """
cursor = self.q.find({'status': 'waiting'}, tailable=True)
while 1:
cursor = self.q.find({'status': self.WAITING},
**self._find_opts())
get_next = True
while get_next:
try:
row = cursor.next()
try:
result = self.q.update({'_id': row['_id'],
'status': 'waiting'},
{'$set': {
'status': 'working',
'ts.started': datetime.now()
}
})
except OperationFailure:
self.q.update({'_id': row['_id'],
'status': self.WAITING},
{'$set': {
'status': self.WORKING,
'ts.started': datetime.now()
}
})
except pymongo.errors.OperationFailure:
print ('Job Failed!!')
continue
print ('---')
print ('Working on job:')
yield row
row['status'] = 'done'
row['status'] = self.DONE
row['ts']['done'] = datetime.now()
self.q.save(row)
except:
time.sleep(5)
if not self.silent:
print ('waiting!')
get_next = self.iterator_wait()

def queue_count(self):
""" Returns the number of jobs waiting in the queue. """
cursor = self.q.find({'status': 'waiting'})
cursor = self.q.find({'status': self.WAITING})
if cursor:
return cursor.count()

Expand Down
33 changes: 19 additions & 14 deletions pymjq/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ def setUpClass(cls):
client.pymongo_test.jobqueue.drop()
cls.db = client.pymongo_test

def tearDown(self):
self.db['jobqueue'].drop()

def test_init(self):
jq = JobQueue(self.db)
self.assertTrue(jq.valid())
self.assertRaises(Exception, jq._create)
jq.clear_queue()

def test_valid(self):
jq = JobQueue(self.db)
jq.db['jobqueue'].drop()
jq._create(capped=False)
self.assertFalse(jq.valid())
self.assertRaises(Exception, jq._create)
jq.clear_queue()

def test_publish(self):
jq = JobQueue(self.db)
Expand All @@ -45,19 +46,23 @@ def test_next(self):
jq.pub(job)
row = jq.next()
self.assertEquals(row['data']['message'], 'hello world!')
jq.clear_queue()
self.assertEquals(jq.queue_count(), 0)

# def test_iter(self):
# jq = JobQueue(self.db)
# job = {'message': 'hello world!'}
# jq.pub(job)
# for job in jq:
# if job:
# self.assertTrue(True, "Found job")
# jq.clear_queue()
# return
# self.assertEquals(False, "No jobs found!")
# jq.clear_queue()
def test_iter(self):
NUM_JOBS = 3
num_jobs_queued = [NUM_JOBS]
def iterator_wait():
num_jobs_queued[0] -= 1
return num_jobs_queued[0] < 0
jq = JobQueue(self.db, iterator_wait=iterator_wait)
for ii in range(1, NUM_JOBS + 1):
job = {'message': 'I am # ' + str(ii)}
jq.pub(job)
num_jobs_done = 0
for job in jq:
print job['data']['message']
num_jobs_done += 1
self.assertEquals(num_jobs_done, NUM_JOBS)


if __name__ == '__main__':
Expand Down