Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions tests/basic_zip_in_out_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,44 @@ def test_no_hang(self, process_mock):
tells our test class that it actually cleaned up workers.
'''
test_passes = {}
processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3])
processes.finish_workers = lambda: test_passes.setdefault('result', True)
processes = vimap.pool.fork(
worker_proc.init_args(init=i) for i in [1, 2, 3])
processes.finish_workers = lambda: test_passes.setdefault('result',
True)
del processes # will happen if it falls out of scope

# gc.collect() -- doesn't seem necessary
T.assert_dicts_equal(test_passes, {'result': True})

def test_basic(self):
processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3])
processes = vimap.pool.fork(
worker_proc.init_args(init=i) for i in [1, 2, 3])
list(processes.zip_in_out())

def test_reuse_pool(self):
'''
Test that process pools can be re-used. This is important for avoiding
forking costs.
'''
processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3])
processes = vimap.pool.fork(
worker_proc.init_args(init=i) for i in [1, 2, 3])

results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=False))
results = list(
processes.imap([4, 4, 4]).zip_in_out(close_if_done=False))
assert set(results) == set([(4, 5), (4, 6), (4, 7)])

results = list(processes.imap([4, 4, 4]).zip_in_out(close_if_done=True))
results = list(
processes.imap([4, 4, 4]).zip_in_out(close_if_done=True))
assert set(results) == set([(4, 5), (4, 6), (4, 7)])

def test_really_parallel(self):
'''Make sure things run in parallel: Determine that different threads are
handling different inputs (via time.sleep stuff). This could fail if the
sleep values are too small to compensate for the forking overhead.
'''Make sure things run in parallel: Determine that different
threads are handling different inputs (via time.sleep stuff).
This could fail if the sleep values are too small to
compensate for the forking overhead.
'''
processes = vimap.pool.fork(worker_proc.init_args(init=i) for i in [1, 2, 3])
processes = vimap.pool.fork(
worker_proc.init_args(init=i) for i in [1, 2, 3])
results = []
for input, output in processes.imap([4, 4, 4] * 3).zip_in_out():
results.append((input, output))
Expand Down
1 change: 0 additions & 1 deletion tests/chunking_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import vimap.pool
import vimap.worker_process


basic_worker = vimap.worker_process.worker(lambda inputs: inputs)


Expand Down
62 changes: 42 additions & 20 deletions tests/closes_fds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from vimap.testing import repeat_test_to_catch_flakiness
import vimap.worker_process


# decrypt POSIX stuff
readable_mode_strings = {
'directory': stat.S_ISDIR,
Expand All @@ -32,7 +31,6 @@
'symlink': stat.S_ISLNK,
'socket': stat.S_ISSOCK}


FDInfo = namedtuple("FDInfo", ["modes", "symlink"])


Expand All @@ -41,13 +39,15 @@ def current_proc_fd_dir(*subpaths):


def fd_type_if_open(fd_number):
"""For a given open file descriptor, return information about that file descriptor.
"""For a given open file descriptor,
return information about that file descriptor.

'modes' are a list of human-readable strings describing the file type;
'symlink' is the target of the file descriptor (often a pipe name)
"""
fd_stat = os.fstat(fd_number)
modes = [k for k, v in readable_mode_strings.items() if v(fd_stat.st_mode)]
modes = [k for k, v in readable_mode_strings.items() if
v(fd_stat.st_mode)]
if os.path.isdir(current_proc_fd_dir()):
return FDInfo(
modes=modes,
Expand All @@ -61,21 +61,25 @@ def list_fds_linux():
fds = [
(int(i), current_proc_fd_dir(str(i)))
for i in os.listdir(current_proc_fd_dir())]
# NOTE: Sometimes, an FD is used to list the above directory. Hence, we should
# NOTE: Sometimes, an FD is used to list the above directory.
# Hence, we should
# re-check whether the FD still exists (via os.path.exists)
return [i for (i, path) in fds if (i >= 3 and os.path.exists(path))]


def list_fds_other():
"""A method to list open FDs that doesn't need /proc/{pid}."""
max_fds_soft, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
if max_fds_soft == resource.RLIM_INFINITY or not (3 < max_fds_soft < 4096):
if max_fds_soft == resource.RLIM_INFINITY or not (
3 < max_fds_soft < 4096):
logging.warning(
"max_fds_soft invalid ({0}), assuming 4096 is a sufficient upper bound"
"max_fds_soft invalid ({0}), "
"assuming 4096 is a sufficient upper bound"
.format(max_fds_soft))
max_fds_soft = 4096

# The first three FDs are stdin, stdout, and stderr. We're interested in
# The first three FDs are stdin, stdout, and stderr.
# We're interested in
# everything after.
for i in xrange(3, max_fds_soft):
try:
Expand Down Expand Up @@ -113,12 +117,17 @@ def difference_open_fds(before, after):
those FDs which were opened (present in `after` but not `before`) and
closed.
"""

# "a - b" for dicts -- remove anything in 'a' that has a key in b
def dict_diff(a, b):
return dict((k, a[k]) for k in (frozenset(a) - frozenset(b)))

for k in (frozenset(after) & frozenset(before)):
if before[k] != after[k]:
print("WARNING: FD {0} changed from {1} to {2}".format(k, before[k], after[k]))
print(
"WARNING: FD {0} changed from {1} to {2}".format(k,
before[k],
after[k]))
return {
'closed': dict_diff(before, after),
'opened': dict_diff(after, before)}
Expand All @@ -128,6 +137,7 @@ class TestOpenFdsMethods(T.TestCase):
"""
Tests that we can detect open file descriptors.
"""

def test_open_fds(self):
first = get_open_fds()
fd = open('vimap/pool.py', 'r')
Expand All @@ -136,10 +146,14 @@ def test_open_fds(self):
fd.close()
third = get_open_fds()
fd2.close()
T.assert_equal(len(difference_open_fds(first, second)['opened']), 2)
T.assert_equal(len(difference_open_fds(first, second)['closed']), 0)
T.assert_equal(len(difference_open_fds(second, third)['closed']), 1)
T.assert_equal(len(difference_open_fds(second, third)['opened']), 0)
T.assert_equal(len(difference_open_fds(first, second)['opened']),
2)
T.assert_equal(len(difference_open_fds(first, second)['closed']),
0)
T.assert_equal(len(difference_open_fds(second, third)['closed']),
1)
T.assert_equal(len(difference_open_fds(second, third)['opened']),
0)


@vimap.worker_process.worker
Expand All @@ -160,6 +174,7 @@ def instrumented_init(*args, **kwargs):
self.queue_fds = difference_open_fds(
self.before_queue_manager_init,
self.after_queue_manager_init)['opened']

with mock.patch.object(
vimap.queue_manager.VimapQueueManager,
'__init__',
Expand All @@ -175,22 +190,29 @@ def test_all_fds_cleaned_up(self):
after_finish_open_fds = get_open_fds()

# Check that some FDs were opened after forking
after_fork = difference_open_fds(initial_open_fds, after_fork_open_fds)
after_fork = difference_open_fds(initial_open_fds,
after_fork_open_fds)
# T.assert_equal(after_fork['closed'], [])
T.assert_gte(len(after_fork['opened']), 2) # should have at least 3 open fds
T.assert_gte(len(after_fork['opened']),
2) # should have at least 3 open fds
# All opened files should be FIFOs
if not all(info.modes == ['fifo'] for info in after_fork['opened'].values()):
if not all(info.modes == ['fifo'] for info in
after_fork['opened'].values()):
print("Infos: {0}".format(after_fork['opened']))
T.assert_not_reached("Some infos are not FIFOs")

after_cleanup = difference_open_fds(after_fork_open_fds, after_finish_open_fds)
after_cleanup = difference_open_fds(after_fork_open_fds,
after_finish_open_fds)
T.assert_gte(len(after_cleanup['closed']), 2)

left_around = difference_open_fds(initial_open_fds, after_finish_open_fds)
left_around = difference_open_fds(initial_open_fds,
after_finish_open_fds)
if len(left_around['opened']) != 0:
queue_fds_left_around = dict(
item for item in self.queue_fds.items() if item[0] in left_around['opened'])
print("Queue FDs left around: {0}".format(queue_fds_left_around))
item for item in self.queue_fds.items() if
item[0] in left_around['opened'])
print(
"Queue FDs left around: {0}".format(queue_fds_left_around))
T.assert_equal(len(left_around['opened']), 0)


Expand Down
10 changes: 7 additions & 3 deletions tests/exception_handling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def test_formatted_traceback(self):
testify.assert_in("in get_ec", ec.formatted_traceback)

def test_formatted_exception(self):
testify.assert_equal("ValueError: hi", self.get_ec().formatted_exception)
testify.assert_equal("ValueError: hi",
self.get_ec().formatted_exception)

def test_reraise(self):
testify.assert_raises_and_contains(
Expand All @@ -51,14 +52,16 @@ def test_unpickleable(self):
cPickle.dumps(ec) # make sure it can indeed be serialized
testify.assert_isinstance(ec, exception_handling.ExceptionContext)
testify.assert_isinstance(ec.value, Exception)
testify.assert_equal(str(ec.value), "UNPICKLEABLE AND UNSERIALIZABLE MESSAGE")
testify.assert_equal(str(ec.value),
"UNPICKLEABLE AND UNSERIALIZABLE MESSAGE")

def test_unpickleable_with_uninitializable_exception(self):
"""
Tests an exception that can't be pickled (due to CustomException
not being in global variables) and one that can't be reinitialized
with a single string argument.
"""

class CustomException(Exception):
def __init__(self, a, b):
self.a, self.b = a, b
Expand All @@ -72,5 +75,6 @@ def __str__(self):
except:
ec = exception_handling.ExceptionContext.current()
cPickle.dumps(ec) # make sure it can indeed be serialized
testify.assert_isinstance(ec.value, exception_handling.UnpickleableException)
testify.assert_isinstance(ec.value,
exception_handling.UnpickleableException)
testify.assert_equal(str(ec.value), "CustomException: 3, 4")
Loading