-
-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathconftest.py
124 lines (100 loc) · 3.27 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# content of conftest.py
# Make loop fixture available in all tests
from distributed.utils_test import loop, loop_in_thread # noqa: F401
import pytest
import dask_jobqueue.config
import dask_jobqueue.lsf
import dask
import distributed.utils_test
import copy
from dask_jobqueue import (
PBSCluster,
MoabCluster,
SLURMCluster,
SGECluster,
LSFCluster,
OARCluster,
HTCondorCluster,
)
from dask_jobqueue.local import LocalCluster
import warnings
def pytest_addoption(parser):
parser.addoption(
"-E",
action="store",
metavar="NAME",
help="only run tests matching the environment NAME.",
)
def pytest_configure(config):
# register additional markers
config.addinivalue_line(
"markers", "env(NAME): only run test if environment NAME matches"
)
config.addinivalue_line(
"markers", "xfail_env(NAME): known failure for environment NAME"
)
def pytest_runtest_setup(item):
warnings.filterwarnings(
"ignore", message="Port 8787 is already in use", category=UserWarning
)
env = item.config.getoption("-E")
envnames = sum(
[
mark.args[0] if isinstance(mark.args[0], list) else [mark.args[0]]
for mark in item.iter_markers(name="env")
],
[],
)
if (
None not in envnames
and (env is None and envnames)
or (env is not None and env not in envnames)
):
pytest.skip("test requires env in %r" % envnames)
else:
xfail = {}
[xfail.update(mark.args[0]) for mark in item.iter_markers(name="xfail_env")]
if env in xfail:
item.add_marker(pytest.mark.xfail(reason=xfail[env]))
all_envs = {
None: LocalCluster,
"pbs": PBSCluster,
"moab": MoabCluster,
"slurm": SLURMCluster,
"sge": SGECluster,
"lsf": LSFCluster,
"oar": OARCluster,
"htcondor": HTCondorCluster,
}
# Overriding cleanup method from distributed that has been added to the loop
# fixture, because it just wipe the Main Loop in our tests, and dask-jobqueue is
# not ready for this.
# FIXME
@pytest.fixture
def cleanup():
dask_jobqueue.config.reconfigure()
yield
# Overriding distributed.utils_test.reset_config() method because it reset the
# config from ou tests.
# FIXME
def reset_config():
dask.config.config.clear()
dask.config.config.update(copy.deepcopy(distributed.utils_test.original_config))
dask_jobqueue.config.reconfigure()
distributed.utils_test.reset_config = reset_config
@pytest.fixture(
params=[pytest.param(v, marks=[pytest.mark.env(k)]) for (k, v) in all_envs.items()]
)
def EnvSpecificCluster(request):
"""Run test only with the specific cluster class set by the environment"""
if request.param == HTCondorCluster:
# HTCondor requires explicitly specifying requested disk space
dask.config.set({"jobqueue.htcondor.disk": "1GB"})
return request.param
@pytest.fixture(params=list(all_envs.values()))
def Cluster(request):
"""Run test for each cluster class when no environment is set (test should not require the actual scheduler)"""
if request.param == HTCondorCluster:
# HTCondor requires explicitly specifying requested disk space
dask.config.set({"jobqueue.htcondor.disk": "1GB"})
return request.param