diff --git a/pymjq/jobqueue.py b/pymjq/jobqueue.py index a82c43c..64fcd57 100644 --- a/pymjq/jobqueue.py +++ b/pymjq/jobqueue.py @@ -5,19 +5,33 @@ class JobQueue: - def __init__(self, db, silent=False): + # Capped collection documents can not have its size updated + # https://docs.mongodb.com/manual/core/capped-collections/#document-size + DONE = 'done'.ljust(10, '_') + WAITING = 'waiting'.ljust(10, '_') + WORKING = 'working'.ljust(10, '_') + + def __init__(self, db, silent=False, iterator_wait=None): """ Return an instance of a JobQueue. Initialization requires one argument, the database, since we use one jobqueue collection to cover all - sites in an installation/database. The second + sites in an installation/database. The second argument specifies if to print status while waiting for new job, the default value is False""" self.db = db - self.silent=silent if not self._exists(): print ('Creating jobqueue collection.') self._create() self.q = self.db['jobqueue'] + self.iterator_wait = iterator_wait + if self.iterator_wait is None: + def deafult_iterator_wait(): + time.sleep(5) + if not silent: + print ('waiting!') + return True + + self.iterator_wait = deafult_iterator_wait def _create(self, capped=True): """ Creates a Capped Collection. """ @@ -29,6 +43,11 @@ def _create(self, capped=True): except: raise Exception('Collection "jobqueue" already created') + def _find_opts(self): + if hasattr(pymongo, 'CursorType'): + return {'cursor_type': pymongo.CursorType.TAILABLE_AWAIT} # pylint: disable=no-member + return {'Tailable': True} + def _exists(self): """ Ensures that the jobqueue collection exists in the DB. """ return 'jobqueue' in self.db.collection_names() @@ -42,11 +61,11 @@ def valid(self): def next(self): """ Runs the next job in the queue. """ - cursor = self.q.find({'status': 'waiting'}, - tailable=True) + cursor = self.q.find({'status': self.WAITING}, + **self._find_opts()) if cursor: row = cursor.next() - row['status'] = 'done' + row['status'] = self.DONE row['ts']['started'] = datetime.now() row['ts']['done'] = datetime.now() self.q.save(row) @@ -61,7 +80,7 @@ def pub(self, data=None): ts={'created': datetime.now(), 'started': datetime.now(), 'done': datetime.now()}, - status='waiting', + status=self.WAITING, data=data) try: self.q.insert(doc, manipulate=False) @@ -72,35 +91,35 @@ def pub(self, data=None): def __iter__(self): """ Iterates through all docs in the queue andw aits for new jobs when queue is empty. """ - cursor = self.q.find({'status': 'waiting'}, tailable=True) - while 1: + cursor = self.q.find({'status': self.WAITING}, + **self._find_opts()) + get_next = True + while get_next: try: row = cursor.next() try: - result = self.q.update({'_id': row['_id'], - 'status': 'waiting'}, - {'$set': { - 'status': 'working', - 'ts.started': datetime.now() - } - }) - except OperationFailure: + self.q.update({'_id': row['_id'], + 'status': self.WAITING}, + {'$set': { + 'status': self.WORKING, + 'ts.started': datetime.now() + } + }) + except pymongo.errors.OperationFailure: print ('Job Failed!!') continue print ('---') print ('Working on job:') yield row - row['status'] = 'done' + row['status'] = self.DONE row['ts']['done'] = datetime.now() self.q.save(row) except: - time.sleep(5) - if not self.silent: - print ('waiting!') + get_next = self.iterator_wait() def queue_count(self): """ Returns the number of jobs waiting in the queue. """ - cursor = self.q.find({'status': 'waiting'}) + cursor = self.q.find({'status': self.WAITING}) if cursor: return cursor.count() diff --git a/pymjq/test.py b/pymjq/test.py index 6568128..2cd3ac7 100644 --- a/pymjq/test.py +++ b/pymjq/test.py @@ -15,11 +15,13 @@ def setUpClass(cls): client.pymongo_test.jobqueue.drop() cls.db = client.pymongo_test + def tearDown(self): + self.db['jobqueue'].drop() + def test_init(self): jq = JobQueue(self.db) self.assertTrue(jq.valid()) self.assertRaises(Exception, jq._create) - jq.clear_queue() def test_valid(self): jq = JobQueue(self.db) @@ -27,7 +29,6 @@ def test_valid(self): jq._create(capped=False) self.assertFalse(jq.valid()) self.assertRaises(Exception, jq._create) - jq.clear_queue() def test_publish(self): jq = JobQueue(self.db) @@ -45,19 +46,23 @@ def test_next(self): jq.pub(job) row = jq.next() self.assertEquals(row['data']['message'], 'hello world!') - jq.clear_queue() + self.assertEquals(jq.queue_count(), 0) - # def test_iter(self): - # jq = JobQueue(self.db) - # job = {'message': 'hello world!'} - # jq.pub(job) - # for job in jq: - # if job: - # self.assertTrue(True, "Found job") - # jq.clear_queue() - # return - # self.assertEquals(False, "No jobs found!") - # jq.clear_queue() + def test_iter(self): + NUM_JOBS = 3 + num_jobs_queued = [NUM_JOBS] + def iterator_wait(): + num_jobs_queued[0] -= 1 + return num_jobs_queued[0] < 0 + jq = JobQueue(self.db, iterator_wait=iterator_wait) + for ii in range(1, NUM_JOBS + 1): + job = {'message': 'I am # ' + str(ii)} + jq.pub(job) + num_jobs_done = 0 + for job in jq: + print job['data']['message'] + num_jobs_done += 1 + self.assertEquals(num_jobs_done, NUM_JOBS) if __name__ == '__main__':