Skip to content

Deadlock solution with timeout approach (draft) #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: deadlock_map_scheduler
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions lambda_multiprocessing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
from time import time
from select import select
import signal
import socket
from pickle import PickleError

from retry_sleep import IncreasingDelayManager

class Child:
proc: Process
Expand All @@ -28,6 +32,8 @@ class Child:

_closed: bool = False

_socket_timeout_s = 0.05

# if True, do the work in the main process
# but present the same interface
# and still send stuff through the pipes (to verify they're pickleable)
Expand All @@ -41,6 +47,10 @@ def __init__(self, main_proc=False):
self.proc = Process(target=self.spin)
self.proc.start()

self.set_pipe_timeout(self.parent_conn)
self.set_pipe_timeout(self.child_conn)


# each child process runs in this
# a while loop waiting for payloads from the self.child_conn
# [(id, func, args, kwds), None] -> call func(args, *kwds)
Expand All @@ -49,14 +59,52 @@ def __init__(self, main_proc=False):
# {id: (None, err)} if func raised exception err
# [None, True] -> exit gracefully (write nothing to the pipe)
def spin(self) -> None:
job_list = []
result_buf = []
while True:
(job, quit_signal) = self.child_conn.recv()
if quit_signal:
break
else:
if job_list:
# 1st priority: do work if we can
(job, quit_signal) = job_list.pop(0)
(id, func, args, kwds) = job
result = self._do_work(id, func, args, kwds)
self.child_conn.send(result)
result_buf.append(result)
elif result_buf:
# 2nd priority: send results if they're ready
result = result_buf[0] # don't remove from list yet
try:
self.child_conn.send(result)
except BlockingIOError as e:
if self.child_conn.poll():
# deadlock situation: child is sending large result
# whilst parent is sending large next task
# read from parent, then try again
# repeatedly until we get sucess
with IncreasingDelayManager() as dm:
while True:
dm.sleep()
delay *= backoff_factor

try:
(job, quit_signal) = self.child_conn.recv()
except (BlockingIOError, PickleError, UnicodeDecodeError):
continue
else:
job_list.append((job, quit_signal))
break
else:
# result was sent sucessfully
# so remove if from the buffer
result_buf.pop(0)
else:
# empty job list, fetch new jobs
try:
(job, quit_signal) = self.child_conn.recv()
except (BlockingIOError, PickleError, UnicodeDecodeError):
# deadlock issue, parent send aborted
pass
else:
job_list.append((job, quit_signal))

self.child_conn.close()

def _do_work(self, id, func, args, kwds) -> Union[Tuple[Any, None], Tuple[None, Exception]]:
Expand All @@ -75,22 +123,38 @@ def submit(self, func, args=(), kwds=None) -> 'AsyncResult':
kwds = {}
id = uuid4()
self.parent_conn.send([(id, func, args, kwds), None])

with IncreasingDelayManager() as dm:
while True:
try:
self.parent_conn.send([(id, func, args, kwds), None])
except BlockingIOError as e:
dm.sleep()
self.flush()
else:
continue

if self.main_proc:
# when doing single-process unit testing
# just do the work now
self.child_conn.recv()
ret = self._do_work(id, func, args, kwds)
self.child_conn.send(ret)

self.queue_sz += 1

return AsyncResult(id=id, child=self)

# grab all results in the pipe from child to parent
# save them to self.result_cache
def flush(self):
# watch out, when the other end is closed, a termination byte appears, so .poll() returns True
while (not self.parent_conn.closed) and (self.queue_sz > 0) and self.parent_conn.poll(0):
result = self.parent_conn.recv()
assert isinstance(list(result.keys())[0], UUID)
self.result_cache.update(result)
self.queue_sz -= 1
with IncreasingDelayManager() as dm:
while (not self.parent_conn.closed) and (self.queue_sz > 0) and self.parent_conn.poll(0):
result = self.parent_conn.recv()
assert isinstance(list(result.keys())[0], UUID)
self.result_cache.update(result)
self.queue_sz -= 1

# prevent new tasks from being submitted
# but keep existing tasks running
Expand Down Expand Up @@ -151,6 +215,10 @@ def terminate(self):
def __del__(self):
self.terminate()

def set_pipe_timeout(self, conn: Connection):
s = socket.socket(fileno=conn.fileno())
#s.settimeout(self._socket_timeout_s)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, (8*b'\x00')+(500000).to_bytes(8, 'little'))

class AsyncResult:
def __init__(self, id: UUID, child: Child):
Expand Down
32 changes: 32 additions & 0 deletions lambda_multiprocessing/retry_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from time import sleep
from random import uniform

# a context manager to help manage retries with backoff
# this doesn't catch errors or do retries
# it just does an increasing sleep
# (exponential backoff)
class IncreasingDelayManager:
def __init__(self, initial_delay=0.01, backoff=2, max_delay=1, jitter_factor=0.1):
self.initial_delay = initial_delay
self.backoff = backoff
self.max_delay = max_delay
self.jitter_factor = jitter_factor

def __enter__(self):
self.delay = self.initial_delay
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def sleep(self):
sleep(self.delay)
# increment by backoff factor
self.delay *= self.backoff

# clip to max
self.delay = min(self.delay, self.max_delay)

# apply jitter as +/- jitter_factor %
jitter_scale = uniform(1-self.jitter_factor, 1+self.jitter_factor)
self.delay *= jitter_scale