Skip to content

Commit 3926a01

Browse files
author
TJ Xu
authored
Add an option to pin to gpu for all estimators (horovod#3526)
* Add an option to pin to gpu for all estimators * Fix CI by downloading nv keys directly
1 parent 33413af commit 3926a01

File tree

11 files changed

+91
-41
lines changed

11 files changed

+91
-41
lines changed

Dockerfile.test.gpu

+7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ ENV DEBIAN_FRONTEND=noninteractive
2929
# Set default shell to /bin/bash
3030
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
3131

32+
# Extract ubuntu distribution version and download the corresponding key.
33+
# This is to fix CI failures caused by the new rotating key mechanism rolled out by Nvidia.
34+
# Refer to https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 for more details.
35+
RUN DIST=$(echo ${CUDA_DOCKER_VERSION#*ubuntu} | sed 's/\.//'); \
36+
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu${DIST}/x86_64/3bf863cc.pub && \
37+
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu${DIST}/x86_64/7fa2af80.pub
38+
3239
# Prepare to install specific g++ versions
3340
RUN apt-get update -qq && apt-get install -y --no-install-recommends software-properties-common
3441
RUN add-apt-repository ppa:ubuntu-toolchain-r/test

docker/horovod-ray/Dockerfile

+7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ ENV DEBIAN_FRONTEND=noninteractive
1616
# Set default shell to /bin/bash
1717
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
1818

19+
# Download the corresponding key for ubuntu1804.
20+
# This is to fix CI failures caused by the new rotating key mechanism rolled out by Nvidia.
21+
# Refer to https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 for more details.
22+
ARG APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1
23+
RUN sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/3bf863cc.pub
24+
RUN sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64/7fa2af80.pub
25+
1926
RUN sudo apt-get update && DEBIAN_FRONTEND="noninteractive" sudo apt-get install -y \
2027
build-essential \
2128
cmake \

docker/horovod/Dockerfile

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ ENV DEBIAN_FRONTEND=noninteractive
2424
# Set default shell to /bin/bash
2525
SHELL ["/bin/bash", "-euo", "pipefail", "-c"]
2626

27+
# Extract ubuntu distribution version and download the corresponding key.
28+
# This is to fix CI failures caused by the new rotating key mechanism rolled out by Nvidia.
29+
# Refer to https://forums.developer.nvidia.com/t/notice-cuda-linux-repository-key-rotation/212771 for more details.
30+
RUN DIST=$(echo ${CUDA_DOCKER_VERSION#*ubuntu} | sed 's/\.//'); \
31+
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu${DIST}/x86_64/3bf863cc.pub && \
32+
apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu${DIST}/x86_64/7fa2af80.pub
33+
2734
RUN apt-get update && apt-get install -y --allow-downgrades --allow-change-held-packages --no-install-recommends \
2835
build-essential \
2936
cmake \

horovod/spark/common/params.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,16 @@ class EstimatorParams(Params):
9696

9797
label_shapes = Param(Params._dummy(), 'label_shapes', 'specifies the shape (or shapes) of the label column (or columns)')
9898

99+
inmemory_cache_all = Param(Params._dummy(), 'inmemory_cache_all',
100+
'Cache the data in memory for training and validation.',
101+
typeConverter=TypeConverters.toBoolean)
102+
103+
use_gpu = Param(Params._dummy(), 'use_gpu',
104+
'Whether to use the GPU for training. '
105+
'Setting this to False will skipping binding to GPU even when GPU is available. '
106+
'Defaults to True.',
107+
typeConverter=TypeConverters.toBoolean)
108+
99109
def __init__(self):
100110
super(EstimatorParams, self).__init__()
101111

@@ -129,7 +139,9 @@ def __init__(self):
129139
train_reader_num_workers=2,
130140
val_reader_num_workers=2,
131141
reader_pool_type='process',
132-
label_shapes=None)
142+
label_shapes=None,
143+
inmemory_cache_all=False,
144+
use_gpu=True)
133145

134146
def _check_params(self, metadata):
135147
model = self.getModel()
@@ -334,6 +346,17 @@ def setLabelShapes(self, value):
334346
def getLabelShapes(self):
335347
return self.getOrDefault(self.label_shapes)
336348

349+
def setInMemoryCacheAll(self, value):
350+
return self._set(inmemory_cache_all=value)
351+
352+
def getInMemoryCacheAll(self):
353+
return self.getOrDefault(self.inmemory_cache_all)
354+
355+
def setUseGpu(self, value):
356+
self._set(use_gpu=value)
357+
358+
def getUseGpu(self):
359+
return self.getOrDefault(self.use_gpu)
337360

338361
class ModelParams(HasOutputCols):
339362
history = Param(Params._dummy(), 'history', 'history')

horovod/spark/keras/estimator.py

+3-11
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,12 @@ class KerasEstimator(HorovodEstimator, KerasEstimatorParamsReadable,
147147
inmemory_cache_all: boolean value. Cache the data in memory for training and validation. Default: False.
148148
backend_env: dict to add to the environment of the backend. Defaults to setting the java heap size to
149149
2G min and max for libhdfs through petastorm
150+
use_gpu: Whether to use the GPU for training. Defaults to True.
150151
"""
151152

152153
custom_objects = Param(Params._dummy(), 'custom_objects', 'custom objects')
153154
checkpoint_callback = Param(Params._dummy(), 'checkpoint_callback',
154155
'model checkpointing callback')
155-
inmemory_cache_all = Param(Params._dummy(), 'inmemory_cache_all',
156-
'Cache the data in memory for training and validation.',
157-
typeConverter=TypeConverters.toBoolean)
158156
backend_env = Param(Params._dummy(), "backend_env",
159157
"dict to add to the environment of the command run on the environment")
160158

@@ -192,14 +190,14 @@ def __init__(self,
192190
label_shapes=None,
193191
checkpoint_callback=None,
194192
inmemory_cache_all=False,
195-
backend_env=None):
193+
backend_env=None,
194+
use_gpu=True):
196195

197196
super(KerasEstimator, self).__init__()
198197

199198
self._setDefault(optimizer=None,
200199
custom_objects={},
201200
checkpoint_callback=None,
202-
inmemory_cache_all=False,
203201
backend_env={'LIBHDFS_OPTS': '-Xms2048m -Xmx2048m'})
204202

205203
kwargs = self._input_kwargs
@@ -235,12 +233,6 @@ def setCheckpointCallback(self, value):
235233
def getCheckpointCallback(self):
236234
return self.getOrDefault(self.checkpoint_callback)
237235

238-
def setInMemoryCacheAll(self, value):
239-
return self._set(inmemory_cache_all=value)
240-
241-
def getInMemoryCacheAll(self):
242-
return self.getOrDefault(self.inmemory_cache_all)
243-
244236
def setBackendEnv(self, value):
245237
self._set(backend_env=value)
246238

horovod/spark/keras/remote.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def RemoteTrainer(estimator, metadata, keras_utils, run_id, dataset_idx):
5252
user_verbose = estimator.getVerbose()
5353
checkpoint_callback = estimator.getCheckpointCallback()
5454
inmemory_cache_all = estimator.getInMemoryCacheAll()
55+
should_use_gpu = estimator.getUseGpu()
5556

5657
# Data reader parameters
5758
train_reader_worker_count = estimator.getTrainReaderNumWorker()
@@ -111,7 +112,16 @@ def train(serialized_model, train_rows, val_rows, avg_row_size):
111112
hvd = get_horovod()
112113
hvd.init()
113114

114-
pin_gpu(hvd, tf, k)
115+
# Verbose mode 1 will print a progress bar
116+
verbose = user_verbose if hvd.rank() == 0 else 0
117+
118+
if should_use_gpu:
119+
if verbose:
120+
print("Pinning current process to the GPU.")
121+
pin_gpu(hvd, tf, k)
122+
else:
123+
if verbose:
124+
print("Skip pinning current process to the GPU.")
115125

116126
if random_seed is not None:
117127
if LooseVersion(tf.__version__) < LooseVersion('2.0.0'):
@@ -137,8 +147,6 @@ def train(serialized_model, train_rows, val_rows, avg_row_size):
137147
scaled_lr = k.backend.get_value(model.optimizer.lr) * hvd.size()
138148
k.backend.set_value(model.optimizer.lr, scaled_lr)
139149

140-
# Verbose mode 1 will print a progress bar
141-
verbose = user_verbose if hvd.rank() == 0 else 0
142150

143151
if verbose:
144152
print(f"Shared lib path is pointing to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}")

horovod/spark/lightning/estimator.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
181181
debug_data_loader: (Optional)Debugging flag for data loader.
182182
train_async_data_loader_queue_size: (Optional) Size of train async data loader queue.
183183
val_async_data_loader_queue_size: (Optional) Size of val async data loader queue.
184+
use_gpu: Whether to use the GPU for training. Defaults to True.
184185
"""
185186

186187
input_shapes = Param(Params._dummy(), 'input_shapes', 'input layer shapes')
@@ -189,10 +190,6 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
189190
train_minibatch_fn = Param(Params._dummy(), 'train_minibatch_fn',
190191
'functions that construct the minibatch train function for torch')
191192

192-
inmemory_cache_all = Param(Params._dummy(), 'inmemory_cache_all',
193-
'Cache the data in memory for training and validation.',
194-
typeConverter=TypeConverters.toBoolean)
195-
196193
num_gpus = Param(Params._dummy(), 'num_gpus',
197194
'Number of gpus per process, default to 1 when CUDA is available in the backend, otherwise 0.')
198195

@@ -266,14 +263,14 @@ def __init__(self,
266263
profiler=None,
267264
debug_data_loader=False,
268265
train_async_data_loader_queue_size=None,
269-
val_async_data_loader_queue_size=None):
266+
val_async_data_loader_queue_size=None,
267+
use_gpu=True):
270268

271269
super(TorchEstimator, self).__init__()
272270
self._setDefault(loss_constructors=None,
273271
input_shapes=None,
274272
train_minibatch_fn=None,
275273
transformation_fn=None,
276-
inmemory_cache_all=False,
277274
num_gpus=None,
278275
logger=None,
279276
log_every_n_steps=50,
@@ -315,12 +312,6 @@ def setLossConstructors(self, value):
315312
def getLossConstructors(self):
316313
return self.getOrDefault(self.loss_constructors)
317314

318-
def setInMemoryCacheAll(self, value):
319-
return self._set(inmemory_cache_all=value)
320-
321-
def getInMemoryCacheAll(self):
322-
return self.getOrDefault(self.inmemory_cache_all)
323-
324315
def setNumGPUs(self, value):
325316
return self._set(num_gpus=value)
326317

horovod/spark/lightning/remote.py

+10
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def RemoteTrainer(estimator, metadata, ckpt_bytes, run_id, dataset_idx, train_ro
6464
debug_data_loader = estimator.getDebugDataLoader()
6565
train_async_data_loader_queue_size = estimator.getTrainAsyncDataLoaderQueueSize()
6666
val_async_data_loader_queue_size = estimator.getValAsyncDataLoaderQueueSize()
67+
should_use_gpu = estimator.getUseGpu()
6768

6869
# get logger
6970
logger = estimator.getLogger()
@@ -194,7 +195,16 @@ def on_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") -
194195
f"Val rows: {val_rows}, Val batch size: {val_batch_size}, Val_steps_per_epoch: {_val_steps_per_epoch}\n"
195196
f"Checkpoint file: {remote_store.checkpoint_path}, Logs dir: {remote_store.logs_path}\n")
196197

198+
if not should_use_gpu and verbose:
199+
print("Skip pinning current process to the GPU.")
200+
197201
cuda_available = torch.cuda.is_available()
202+
203+
if cuda_available and not should_use_gpu:
204+
print("GPU is available but use_gpu is set to False."
205+
"Training will proceed without GPU support.")
206+
cuda_available = False
207+
198208
# We need to check all ranks have same device type for traning.
199209
# Horovod doesn't support heterogeneous allreduce for gradients.
200210
cuda_avail_list = hvd.allgather_object(cuda_available, name='device type')

horovod/spark/torch/estimator.py

+5-13
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
147147
val_reader_num_workers: Similar to the train_reader_num_workers.
148148
reader_pool_type: Type of worker pool used to parallelize reading data from the dataset.
149149
Should be one of ['thread', 'process']. Defaults to 'process'.
150+
inmemory_cache_all: (Optional) Cache the data in memory for training and validation.
151+
use_gpu: Whether to use the GPU for training. Defaults to True.
150152
"""
151153

152154
input_shapes = Param(Params._dummy(), 'input_shapes', 'input layer shapes')
@@ -155,10 +157,6 @@ class TorchEstimator(HorovodEstimator, TorchEstimatorParamsWritable,
155157
train_minibatch_fn = Param(Params._dummy(), 'train_minibatch_fn',
156158
'functions that construct the minibatch train function for torch')
157159

158-
inmemory_cache_all = Param(Params._dummy(), 'inmemory_cache_all',
159-
'Cache the data in memory for training and validation.',
160-
typeConverter=TypeConverters.toBoolean)
161-
162160
@keyword_only
163161
def __init__(self,
164162
num_proc=None,
@@ -193,14 +191,14 @@ def __init__(self,
193191
val_reader_num_workers=None,
194192
reader_pool_type=None,
195193
label_shapes=None,
196-
inmemory_cache_all=False):
194+
inmemory_cache_all=False,
195+
use_gpu=True):
197196

198197
super(TorchEstimator, self).__init__()
199198
self._setDefault(loss_constructors=None,
200199
input_shapes=None,
201200
train_minibatch_fn=None,
202-
transformation_fn=None,
203-
inmemory_cache_all=False)
201+
transformation_fn=None)
204202

205203
kwargs = self._input_kwargs
206204

@@ -227,12 +225,6 @@ def setLossConstructors(self, value):
227225
def getLossConstructors(self):
228226
return self.getOrDefault(self.loss_constructors)
229227

230-
def setInMemoryCacheAll(self, value):
231-
return self._set(inmemory_cache_all=value)
232-
233-
def getInMemoryCacheAll(self):
234-
return self.getOrDefault(self.inmemory_cache_all)
235-
236228
def _get_optimizer(self):
237229
return self.getOrDefault(self.optimizer)
238230

horovod/spark/torch/remote.py

+10
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def RemoteTrainer(estimator, metadata, last_checkpoint_state, run_id, dataset_id
6060
transformation_fn = estimator.getTransformationFn()
6161
transformation = transformation_fn if transformation_fn else None
6262
inmemory_cache_all = estimator.getInMemoryCacheAll()
63+
should_use_gpu = estimator.getUseGpu()
6364

6465
# If loss weight is not provided, use equal loss for all the labels
6566
loss_weights = estimator.getLossWeights()
@@ -134,7 +135,16 @@ def train(serialized_model, optimizer_cls, model_opt_state_serialized,
134135
raise ValueError("user_shuffle_buffer_size cannot be negative!")
135136
shuffle_buffer_size = user_shuffle_buffer_size
136137

138+
if not should_use_gpu and user_verbose:
139+
print("Skip pinning current process to the GPU.")
140+
137141
cuda_available = torch.cuda.is_available()
142+
143+
if cuda_available and not should_use_gpu:
144+
print("GPU is available but use_gpu is set to False."
145+
"Training will proceed without GPU support.")
146+
cuda_available = False
147+
138148
# We need to check all ranks have same device type for traning.
139149
# Horovod doesn't support heterogeneous allreduce for gradients.
140150
cuda_avail_list = hvd.allgather_object(cuda_available, name='device type')

test/integration/test_spark_keras.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ def test_fit_model(self):
9898
batch_size=1,
9999
random_seed=1,
100100
epochs=3,
101-
verbose=2)
101+
verbose=2,
102+
use_gpu=False)
103+
104+
assert not keras_estimator.getUseGpu()
102105

103106
keras_model = keras_estimator.fit(df)
104107

0 commit comments

Comments
 (0)