Skip to content

Commit

Permalink
Support horovod (#524)
Browse files Browse the repository at this point in the history
* Support horovod

* Optimize horovod_driver code

* Optimize horovod driver test case

* Add horovod example

* Add some todo issue and remove test temporarily

* Add design proposal

* Add more tests

* Remove horovod_driver in test resources and optimize doc

* Start driver on task executor

* Add registerCallbackInfo rpc call and adopt it

* fix bug of build workerlist in horovod

* Update proposal

* Add horovod should pass test

* Fix some bugs

* Add file newline character

* Add more comment on code

* Fix test case failed
  • Loading branch information
zuston authored Apr 24, 2021
1 parent 5230f93 commit 00e88b0
Show file tree
Hide file tree
Showing 24 changed files with 1,803 additions and 14 deletions.
235 changes: 235 additions & 0 deletions docs/proposals/horovod-on-tony.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
- [Motivation](#motivation)
- [Goals](#goals)
- [API](#api)
- [Design](#design)
- [Usage](#usage)

## Motivation
The purpose of Horovod is to make it easy to take a single-GPU training script and successfully scale it to train across many GPUs in parallel. This has two aspects:
1. Easy to use
2. Run faster in distributed mode

However, because of the limitation of SSH mechanism (hard to support SSH on Yarn), TonY don't support horovod with **MPI controller**. With the help of **Gloo controller**, we are expecting to support horovod.

## Goals
1. Support Horovod on TonY (limited static topology support)
2. Elastic Horovod will be supported later.

## API
There are no API changes to other machine learning frameworks.

## Design
Attention: It's gloo controller that makes TonY support horovod.

From the perspective of compatibility and maintainability, it's better to directly use the [gloo_runner](https://github.com/horovod/horovod/blob/master/horovod/runner/gloo_run.py) on TonY. But after reading through Horovod's code, I find it difficult to reuse gloo_runner's code on TonY, because the existence of some unrelated codes will lead driver to start the worker through the SSH command, which is hard to be supportted on Yarn.

After having a deep understanding of Horovod code and [communicating with developers](https://github.com/horovod/horovod/discussions/2785), i know that the Gloo controller uses a rendezvous server to assign each worker role, and provides HTTP API for workers to obtain cluster information. So each worker can build a training cluster and start training at the same time.

Horovod is served as two roles, worker and driver. Driver is responsible for starting the rendezvous server and will not participate in training (no GPU required, lightweight). Before starting, driver need to know all workers' hostnames in advance. The worker is only responsible for training. According to TonY's architecture (**Application master** and **task executor**), the design can be as follows.

### Horovod Driver
__How to start rendezvous server__
Reusing Horovod rendezvous server code, we introduce tony-horovod driver launcher to offer a python script
```python
# Init the horovod rendezous server
global_rendezv = RendezvousServer(verbose=1)
# Output server port, which will be used horovod worker to connect server.
global_rendezv_port = global_rendezv.start()
print("Rendezvous server started, port: " + str(global_rendezv_port))

hosts = parse_hosts(worker_list)
# Output the host plan, it will output local_rank, rank and so on.
host_alloc_plan = get_host_assignments(hosts, 1)

# Start the server.
global_rendezv.init(host_alloc_plan)
```

__When to start driver__
After all workers' resource have be assigned and TonY's Application master could get all workers' registry info.

__Where to start driver__
Two options
1. On TonY application master.
This will save resources(no extra resources to start driver), and the amount of code changes will be small. But by injecting relevant Horovod's driver code into AM, it is not elegant.
2. On TonY task executor.
Additional customization of the driver configuration is required and the startup of driver will be covered on TonY automatically. And it is necessary to coordinate the startup sequence between the driver and other workers, because driver should start before worker.

Second option will be adopted in this PR.
In order to unify different machine framework startup, we supposed to create `FrameworkRuntime` interface, it will expose methods as follows
```java
/** For AM, getting cluster spec and return to task exectuor **/
String constructClusterSpec(String taskId) throws IOException;

/** For AM, when app finished, it need to call it to release resource **/
void destroy();

/** For AM, init the tony session **/
void setTonySession(final TonySession session);

/** For AM, it ensures that each task executor start sequence. like Horovod driver should start before workers **/
boolean canStartTask(TonyConfigurationKeys.DistributedMode distributedMode, String taskId);

/** For AM, it will pre-check tony conf and inject some params. like horovod runtime will inject driver config into it. **/
boolean validateAndUpdateConfig(Configuration tonyConf);

/**
* For AM, it will receive some callback info from task executor.
* This method will be called when Application Master accepting task executors' callback info.
* This method is suitable for the task executors that have a dependency of startup sequence,
* and the start of downstream tasks needs to rely on the info after the start of the upstream task.
*/
boolean receiveTaskCallbackInfo(String taskId, String callbackInfo);

/** For TaskExecutor, execute task process **/
int run(TaskExecutor executor) throws Exception;
```

So, we need to create `HorovodRuntime` to support it. Besides, TF/PyTorch/MXNet will also be supported in independent runtime, like `TFRuntime`.

As stated in the design above, Horovod driver should be started on one task executor and before other workers. So in `HorovodRuntime`, we can use `canStartTask` method to coordinate task executor startup sequence.

Besides, how to start Horovod driver? I think we can create `HorovodDriver` class to do it. Its methods as follows.
```java
public class HorovodDriver {
public final Process taskProcess;
public final int port;
public final List<SlotInfo> slotInfoList;

// For TaskExecutor to start horovod driver, it will start rendezvous server
public synchronized static HorovodDriver create(String workerList) throws Exception {
return startRendezvousServer(workerList);
}

private static HorovodDriver startRendezvousServer(String workerlist) throws Exception {
...
}

public void close() {
if (taskProcess != null) {
killProcess(taskProcess);
}
}

// For TaskExecutor to wait process finish, it will hang until python Process exit.
public int waitFor(long timeout) throws InterruptedException {
this.taskProcess.waitFor(timeout, TimeUnit.MICROSECONDS);
return this.taskProcess.exitValue();
}
}
```

How to extend `HorovodRuntime`. pseudo code as follows
```java
public class HorovodRuntime implements MLFrameworkRuntime {
private volatile boolean isDriverReady = false;

private List<SlotInfo> workerSlotMetaInfo;
private String rendezvServerPort;
private String rendezvServerHost;

@Override
public String constructClusterSpec(String taskId) throws IOException {
// when task is Driver, it will return worker list to driver, and make it start rendezvous server

// when task is worker, it will return rendezouvs server's slot info to worker
}

@Override
public boolean receiveTaskCallbackInfo(String taskId, String callbackInfo) {
// when role is driver, AM will accept driver's callback info, which is slot info including horovod
// host plan. It will be recorded in runtime and give to workers.
}

@Override
public boolean canStartTask(TonyConfigurationKeys.DistributedMode distributedMode, String taskId) {
// coordinate startup sequence
}

@Override
public boolean preCheck(Configuration tonyConf) {
// inject driver conf and make it untracked.
tonyConf.set("tony.driver.instances", "1");
tonyConf.set("tony.driver.vcores", "1");
tonyConf.set("tony.application.untracked.jobtypes", "driver");
return true;
}

// ===================For task executor=======================

public void buildTaskEnv(TaskExecutor executor) throws Exception {
// set env for worker, like HOROVOD_CONTROLLER, HOST
}

@Override
public int run(TaskExecutor executor) throws Exception {
buildTaskEnv(executor);
// if it is driver, it will launcher horovod driver and register info to AM
if (DRIVER.equals(executor.getJobName())) {
HorovodDriver driver = HorovodDriver.create(executor.getClusterSpec());
String callBackInfo = driver.getCallbackInfo();
log.info("Horovod driver call back to AM: \n" + callBackInfo);
executor.registerCallbackInfo(callBackInfo);
int exitCode = driver.waitFor();
return exitCode;
}

// if it is worker, it will execute training script directly.
return this.executorPythonShell(executor);
}
}
```

### Horovod Worker
__where to start worker__
Only on TonY's task executor.

__How to start worker__
Just like start tensorflow task.
But some envs should be injected before starting worker.

```
HOROVOD_CONTROLLER=gloo
HOROVOD_CPU_OPERATIONS=gloo
HOROVOD_GLOO_TIMEOUT_SECONDS=2000
HOROVOD_GLOO_RENDEZVOUS_PORT=9999
HOROVOD_GLOO_RENDEZVOUS_ADDR=localhost
HOROVOD_CROSS_RANK=0
HOROVOD_CROSS_SIZE=1
HOROVOD_LOCAL_RANK=0
HOROVOD_LOCAL_SIZE=1
HOROVOD_SIZE=1
HOROVOD_RANK=0
HOROVOD_HOSTNAME=0.0.0.0
```
__How to get these horovod params?__
Acutally, these params are from **host_alloc_plan**(mentioned in previous python code). The python script should output these params and AM will get them and assign to task executor.

## Usage
tony-test.xml is as follows, more details are shown on tony-examples module.

```
<configuration>
<property>
<name>tony.worker.instances</name>
<value>4</value>
</property>
<property>
<name>tony.worker.memory</name>
<value>3g</value>
</property>
<property>
<name>tony.docker.enabled</name>
<value>true</value>
</property>
<property>
<name>tony.docker.containers.image</name>
<value>YOUR_DOCKER_IMAGE_ADDRESS</value>
</property>
<property>
<name>tony.application.framework</name>
<value>horovod</value>
</property>
</configuration>
```
17 changes: 6 additions & 11 deletions tony-core/src/main/java/com/linkedin/tony/ApplicationMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,6 @@ private void stop() {
}

frameworkRuntime.destroy();

nmClientAsync.stop();
amRMClient.stop();
// Poll until TonyClient signals we should exit
Expand Down Expand Up @@ -868,12 +867,6 @@ public Set<TaskInfo> getTaskInfos() {
return Collections.emptySet();
}

@Override
public String getClusterSpec() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(session.getClusterSpec());
}

@Override
public void taskExecutorHeartbeat(String taskId) {
TonyTask task = session.getTask(taskId);
Expand All @@ -885,6 +878,12 @@ public void taskExecutorHeartbeat(String taskId) {
}
}

@Override
public String getClusterSpec() throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(session.getClusterSpec());
}

@Override
public String registerWorkerSpec(String taskId, String spec) throws IOException {
TonyTask task = session.getTask(taskId);
Expand All @@ -901,13 +900,9 @@ public String registerWorkerSpec(String taskId, String spec) throws IOException
killChiefWorkerIfTesting(taskId);
}

// two distributed mode (default is GANG) cases:
// 1. In FCFS mode, task will be allowed to run when AM accept worker registered spec,
// 2. In GANG mode, it will start until all tasks have registered.
if (frameworkRuntime.canStartTask(distributedMode, taskId)) {
return frameworkRuntime.constructClusterSpec(taskId);
}

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.hadoop.conf.Configuration;

import com.linkedin.tony.runtime.HorovodRuntime;
import com.linkedin.tony.runtime.MXNetRuntime;
import com.linkedin.tony.runtime.PyTorchRuntime;
import com.linkedin.tony.runtime.StandaloneRuntime;
Expand All @@ -35,6 +36,8 @@ static FrameworkRuntime get(TonyConfigurationKeys.FrameworkType frameworkType) {
return new PyTorchRuntime();
case MXNET:
return new MXNetRuntime();
case HOROVOD:
return new HorovodRuntime();
case STANDALONE:
return new StandaloneRuntime();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private static TaskExecutor createExecutor() throws Exception {
TimeUnit.MILLISECONDS);

executor.setupPorts();

executor.clusterSpec = executor.registerAndGetClusterSpec();

if (executor.clusterSpec == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,10 @@ public static String getContainerDockerMountKey() {
// Configurations that can take multiple values.
public static final List<String> MULTI_VALUE_CONF = Collections.unmodifiableList(
Arrays.asList(CONTAINER_LAUNCH_ENV, EXECUTION_ENV, getContainerResourcesKey()));

// Local testing horovod driver
public static final String TEST_HOROVOD_FAIL_ENABLE_KEY = TONY_APPLICATION_PREFIX + "test.horovod-driver-fail-enable";
public static final boolean DEFAULT_TEST_HOROVOD_FAIL = false;
public static final String IN_TEST_HOROVOD_MODE = TONY_APPLICATION_PREFIX + "test.horovod-test-mode-enable";
public static final boolean DEFAULT_IN_TEST_HOROVOD_MODE = false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved. Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
package com.linkedin.tony.horovod;

import java.util.List;

public class DriverCallbackInfo {
private String port;
private String host;
private List<SlotInfo> slotInfos;

public DriverCallbackInfo() {
// ignore
}

public DriverCallbackInfo(String port, String host, List<SlotInfo> slotInfos) {
this.port = port;
this.host = host;
this.slotInfos = slotInfos;
}

public String getPort() {
return port;
}

public void setPort(String port) {
this.port = port;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public List<SlotInfo> getSlotInfos() {
return slotInfos;
}

public void setSlotInfos(List<SlotInfo> slotInfos) {
this.slotInfos = slotInfos;
}
}
Loading

0 comments on commit 00e88b0

Please sign in to comment.