-
Notifications
You must be signed in to change notification settings - Fork 967
How to run multiple job parallel without blocking main thread?
Dhruvin Shah edited this page Jun 7, 2019
·
2 revisions
Hello,
We have nice FAQ available at official website and pretty much cover all the scenario. but for my requirement it was missing one case.
So if you take a look at FAQ, you will find 2 scenario where, it allow to run multiple job parallel here and run scheule on non-blocking manner here
but here is the thing, in the first case, it allows you to run 100s of job parallel but scheduler is blocking the main thread. And in the second case, on a separate thread, you can run multiple job serially and main thread is free.
But what if you want to run multiple job parallel without blocking the main thread. here below a small code snippet to do that.
from schedule import Scheduler, default_scheduler
import threading
import time
def job1():
print("I'm working job 1..")
def job2():
print("I'm working job 2..")
def job3():
print("I'm working job 3..")
class CustomScheduler(object):
"""
Usage:
With default scheduler:
sched1 = CustomScheduler(job1)
sched1.everyday_schedule('07:45')
sched2 = CustomScheduler(job2)
sched2.everyday_schedule('07:46')
sched3 = CustomScheduler(job3)
sched3.everyday_schedule('07:47')
sched1.run()
while True:
time.sleep(1)
This will run all three job at given time in a main thread i.e. blocking
job. Even sched1 is use to run the job; it will still run all three job
since Schedule object is default one, and we assign all the jobs onto it
Custom scheduler:
from schedule import Scheduler
schedule1 = Scheduler()
sched1 = CustomScheduler(job1, schedule1)
sched1.everyday_schedule('07:45')
schedule2 = Scheduler()
sched2 = CustomScheduler(job1, schedule2)
sched2.everyday_schedule('07:45')
schedule3 = Scheduler()
sched3 = CustomScheduler(job1, schedule3)
sched3.everyday_schedule('07:45')
sched1.run()
sched2.run()
sched3.run()
while True:
time.sleep(1)
This will run all three job at given time in a main thread i.e. blocking
job. Here custom schedule object used. So need to run all three jobs by
calling Run method on it.
Threaded Jobs:
from schedule import Scheduler
schedule1 = Scheduler()
sched1 = CustomScheduler(job1, schedule1)
sched1.everyday_schedule('07:45')
schedule2 = Scheduler()
sched2 = CustomScheduler(job1, schedule2)
sched2.everyday_schedule('07:45')
schedule3 = Scheduler()
sched3 = CustomScheduler(job1, schedule3)
sched3.everyday_schedule('07:45')
sched1.threaded_schedule()
sched2.threaded_schedule()
sched3.threaded_schedule()
while True:
# do what you want to do
time.sleep(1)
Here all the jobs are running in seperate thread, so main block is
free to use. Same as "custom scheduler" method just run by
threaded_schedule method.
One interesting thing need to note that, in a threaded system only one job
should be assigned to custom scheduler. Otherwise assertion error will get
raise.
"""
def __init__(self, job, schedule=default_scheduler):
"""for multi threaded system you want to have unique schedule
with only one job in it. so you can create your object and pass it
to schedule. if no object get passed then default schedule get
initialized."""
self.job = job
self.schedule = schedule
@property
def get_job(self):
"get number of job for given scheduler"
return self.schedule.jobs
def everyday_schedule(self, at_time):
"""This method schedules a job to run every day for given time.
time: srting time in "HH:MM" format"""
assert type(at_time) == str, 'Provide time in HH:MM string format'
self.schedule.every().day.at(at_time).do(self.job)
def weekly_schedule(self, at_time, day='sunday'):
"""This method schedules a job to run every week specific day
for given time.
at_time: srting time in "HH:MM" format
day: provide string of weekday by default its sunday
usage:
sched1 = CustomScheduler(job1)
sched1.weekly_schedule('08:14', 'friday')
sched1.threaded_schedule()"""
getattr(self.schedule.every(), day).at(at_time).do(self.job)
def run(self):
while True:
self.schedule.run_pending()
time.sleep(1)
def threaded_schedule(self):
"""this mehtod run a schedule on a threaded system
Note that only one job can be assign to given scheduler object
or create new object"""
assert len(
self.schedule.jobs) == 1, "there should be one job per Scheduler"
t1 = threading.Thread(target=self.run)
t1.daemon = True
t1.start()