diff --git a/build.gradle b/build.gradle
index b7d28983..c1dc8597 100644
--- a/build.gradle
+++ b/build.gradle
@@ -89,6 +89,8 @@ subprojects {
apply plugin: "license"
apply plugin: "com.google.protobuf"
apply plugin: "java"
+ apply plugin: "checkstyle"
+ apply plugin: "findbugs"
apply plugin: "eclipse"
apply plugin: "idea"
apply plugin: "maven-publish"
@@ -180,6 +182,26 @@ subprojects {
signing {
sign publishing.publications.mavenJava
}
+
+ checkstyle {
+ configFile rootProject.file('config/checkstyle/checkstyle.xml')
+ maxWarnings 0
+ }
+
+ tasks.withType(FindBugs) {
+ reports {
+ xml.enabled false
+ html.enabled true
+ }
+
+ classpath = classpath.filter {
+ // POM files are getting included in the classpath for some reason, but this causes parsing errors when FindBugs
+ // tries to analyze them, thinking they are zips. Excluding POM files as a workaround.
+ !it.name.endsWith('.pom')
+ }
+
+ excludeFilter = rootProject.file('config/findbugs/findbugsExclude.xml')
+ }
}
apply plugin: 'distribution'
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
new file mode 100644
index 00000000..a9bffe83
--- /dev/null
+++ b/config/checkstyle/checkstyle.xml
@@ -0,0 +1,198 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
new file mode 100644
index 00000000..253f46d4
--- /dev/null
+++ b/config/checkstyle/suppressions.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
diff --git a/config/findbugs/findbugsExclude.xml b/config/findbugs/findbugsExclude.xml
new file mode 100644
index 00000000..3b6da0f1
--- /dev/null
+++ b/config/findbugs/findbugsExclude.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/tony-azkaban/src/main/java/com/linkedin/tony/azkaban/TensorFlowJob.java b/tony-azkaban/src/main/java/com/linkedin/tony/azkaban/TensorFlowJob.java
index 82069110..cf889f04 100644
--- a/tony-azkaban/src/main/java/com/linkedin/tony/azkaban/TensorFlowJob.java
+++ b/tony-azkaban/src/main/java/com/linkedin/tony/azkaban/TensorFlowJob.java
@@ -84,40 +84,41 @@ protected String getJVMArguments() {
@Override
protected String getMainArguments() {
- String args = super.getMainArguments();
+ StringBuilder args = new StringBuilder(super.getMainArguments());
info("All job props: " + getJobProps());
info("All sys props: " + getSysProps());
String srcDir = getJobProps().getString(TensorFlowJobArg.SRC_DIR.azPropName, "src");
- args += " " + TensorFlowJobArg.SRC_DIR.tonyParamName + " " + srcDir;
+ args.append(" " + TensorFlowJobArg.SRC_DIR.tonyParamName + " " + srcDir);
String hdfsClasspath = getJobProps().getString(TensorFlowJobArg.HDFS_CLASSPATH.azPropName, null);
if (hdfsClasspath != null) {
- args += " " + TensorFlowJobArg.HDFS_CLASSPATH.tonyParamName + " " + hdfsClasspath;
+ args.append(" " + TensorFlowJobArg.HDFS_CLASSPATH.tonyParamName + " " + hdfsClasspath);
}
Map workerEnvs = getJobProps().getMapByPrefix(WORKER_ENV_PREFIX);
for (Map.Entry workerEnv : workerEnvs.entrySet()) {
- args += " " + TensorFlowJobArg.SHELL_ENV.tonyParamName + " " + workerEnv.getKey() + "=" + workerEnv.getValue();
+ args.append(" " + TensorFlowJobArg.SHELL_ENV.tonyParamName + " " + workerEnv.getKey()
+ + "=" + workerEnv.getValue());
}
String taskParams = getJobProps().getString(TensorFlowJobArg.TASK_PARAMS.azPropName, null);
if (taskParams != null) {
- args += " " + TensorFlowJobArg.TASK_PARAMS.tonyParamName + " '" + taskParams + "'";
+ args.append(" " + TensorFlowJobArg.TASK_PARAMS.tonyParamName + " '" + taskParams + "'");
}
String pythonBinaryPath = getJobProps().getString(TensorFlowJobArg.PYTHON_BINARY_PATH.azPropName, null);
if (pythonBinaryPath != null) {
- args += " " + TensorFlowJobArg.PYTHON_BINARY_PATH.tonyParamName + " " + pythonBinaryPath;
+ args.append(" " + TensorFlowJobArg.PYTHON_BINARY_PATH.tonyParamName + " " + pythonBinaryPath);
}
String pythonVenv = getJobProps().getString(TensorFlowJobArg.PYTHON_VENV.azPropName, null);
if (pythonVenv != null) {
- args += " " + TensorFlowJobArg.PYTHON_VENV.tonyParamName + " " + pythonVenv;
+ args.append(" " + TensorFlowJobArg.PYTHON_VENV.tonyParamName + " " + pythonVenv);
}
String executes = getJobProps().getString(TensorFlowJobArg.EXECUTES.azPropName, null);
if (executes != null) {
- args += " " + TensorFlowJobArg.EXECUTES.tonyParamName + " " + executes;
+ args.append(" " + TensorFlowJobArg.EXECUTES.tonyParamName + " " + executes);
}
Map tonyConfs = getJobProps().getMapByPrefix(TONY_CONF_PREFIX);
@@ -127,7 +128,9 @@ protected String getMainArguments() {
}
// Write user's tony confs to an xml to be localized.
- tonyConfFile.getParentFile().mkdir();
+ if (!tonyConfFile.getParentFile().mkdir()) {
+ throw new RuntimeException("Failed to create parent directory for TonY conf file.");
+ }
try (OutputStream os = new FileOutputStream(tonyConfFile)) {
tfConf.writeXml(os);
} catch (Exception e) {
@@ -136,6 +139,6 @@ protected String getMainArguments() {
info("Complete main arguments: " + args);
- return args;
+ return args.toString();
}
}
diff --git a/tony-azkaban/src/test/java/com/linkedin/tony/azkaban/TestTensorFlowJob.java b/tony-azkaban/src/test/java/com/linkedin/tony/azkaban/TestTensorFlowJob.java
index 848ae029..2a098cfd 100644
--- a/tony-azkaban/src/test/java/com/linkedin/tony/azkaban/TestTensorFlowJob.java
+++ b/tony-azkaban/src/test/java/com/linkedin/tony/azkaban/TestTensorFlowJob.java
@@ -29,7 +29,7 @@ public class TestTensorFlowJob {
private static void initServiceProvider() {
final Injector injector = Guice.createInjector(new AbstractModule() {
@Override
- protected void configure() {}
+ protected void configure() { }
});
// Because SERVICE_PROVIDER is a singleton and it is shared among many tests,
// need to reset the state to avoid assertion failures.
diff --git a/tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java b/tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java
index 21183afa..6e3d2825 100644
--- a/tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java
+++ b/tony-cli/src/main/java/com/linkedin/tony/cli/NotebookSubmitter.java
@@ -58,7 +58,7 @@ public int submit(String[] args) throws ParseException, URISyntaxException, IOEx
opts.addOption("src_dir", true, "Name of directory of source files.");
int exitCode = 0;
- Path cachedLibPath = null;
+ Path cachedLibPath;
Configuration hdfsConf = new Configuration();
try (FileSystem fs = FileSystem.get(hdfsConf)) {
cachedLibPath = new Path(fs.getHomeDirectory(), TONY_FOLDER + Path.SEPARATOR + UUID.randomUUID().toString());
@@ -67,10 +67,7 @@ public int submit(String[] args) throws ParseException, URISyntaxException, IOEx
LOG.info("Copying " + jarPath + " to: " + cachedLibPath);
} catch (IOException e) {
LOG.fatal("Failed to create FileSystem: ", e);
- exitCode = -1;
- }
- if (cachedLibPath == null) {
- System.exit(-1);
+ return -1;
}
String[] updatedArgs = Arrays.copyOf(args, args.length + 4);
@@ -81,9 +78,6 @@ public int submit(String[] args) throws ParseException, URISyntaxException, IOEx
client.init(updatedArgs);
client.start();
- if (client == null) {
- System.exit(-1);
- }
Thread clientThread = new Thread(client::start);
clientThread.start();
while (clientThread.isAlive()) {
@@ -118,6 +112,5 @@ public static void main(String[] args) throws Exception {
NotebookSubmitter submitter = new NotebookSubmitter();
exitCode = submitter.submit(args);
System.exit(exitCode);
-
}
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/TFConfig.java b/tony-core/src/main/java/com/linkedin/tony/TFConfig.java
index fd5367d7..4158cc62 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TFConfig.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TFConfig.java
@@ -20,7 +20,7 @@ public static class Task {
private int index;
// Jackson needs a default constructor
- Task() {}
+ Task() { }
Task(String type, int index) {
this.type = type;
@@ -47,7 +47,7 @@ public void setIndex(int index) {
}
// Jackson needs a default constructor
- TFConfig() {}
+ TFConfig() { }
public TFConfig(Map> clusterSpec, String jobName, int taskIndex) {
this.clusterSpec = clusterSpec;
diff --git a/tony-core/src/main/java/com/linkedin/tony/TFPolicyProvider.java b/tony-core/src/main/java/com/linkedin/tony/TFPolicyProvider.java
index 14b6583b..aaef4914 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TFPolicyProvider.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TFPolicyProvider.java
@@ -9,18 +9,11 @@
import com.linkedin.tony.rpc.TensorFlowCluster;
/**
- * * PolicyProvider for Client to AM protocol.
- * */
+ * PolicyProvider for Client to AM protocol.
+ **/
public class TFPolicyProvider extends PolicyProvider {
-
- private static final Service[] TF_AM_SERVICE =
- new Service[]{
- new Service(
- "security.tf.client-am-protocol.acl",
- TensorFlowCluster.class)};
-
@Override
public Service[] getServices() {
- return TF_AM_SERVICE;
- };
+ return new Service[]{new Service("security.tf.client-am-protocol.acl", TensorFlowCluster.class)};
+ }
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java b/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
index bdcf2307..6b7e5522 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
@@ -165,6 +165,8 @@ public static void main(String[] args) throws Exception {
executor.shellEnv.put(Constants.WORLD, String.valueOf(executor.numTasks));
break;
}
+ default:
+ throw new RuntimeException("Unsupported executor framework: " + executor.framework);
}
int exitCode = Utils.executeShell(executor.taskCommand, executor.timeOut, executor.shellEnv);
@@ -210,8 +212,6 @@ private String registerAndGetClusterSpec(String amAddress) {
String hostName = Utils.getCurrentHostName();
LOG.info("ContainerId is: " + containerId + " HostName is: " + hostName);
- hangIfTesting();
-
// Start the Heartbeater..
hbExec.scheduleAtFixedRate(new Heartbeater(),
0, hbInterval, TimeUnit.MILLISECONDS);
@@ -273,9 +273,10 @@ public void run() {
} catch (Exception e) {
LOG.error("[" + taskId + "] Failed to send Heart Beat.", e);
if (++numFailedHBAttempts > MAX_NUM_FAILED_HB_ATTEMPTS) {
- LOG.error("[" + taskId + "] Exceeded Failed Heart Beat send attempts.. going to die !!");
+ LOG.error("[" + taskId + "] Exceeded max number of allowed failed heart beat send attempts. "
+ + "Going to stop heartbeating!");
e.printStackTrace();
- System.exit(-1);
+ throw new RuntimeException(e);
} else {
LOG.warn("Will retry heartbeat..");
}
@@ -283,7 +284,7 @@ public void run() {
}
}
- //region TonyDataFeed
+ // Start region TonyDataFeed
// TODO : currently requires caller (tf job) to provide the path to read
// maybe a better abstraction if task executor itself figures this out (if
@@ -303,30 +304,7 @@ public HdfsAvroFileSplitReader getHdfsAvroFileSplitReader(List readPaths
return new HdfsAvroFileSplitReader(hdfsConf, readPaths, this.taskIndex,
this.numTasks, useRandomShuffle);
}
-
-
- //endregion
-
- //region Testing
-
- private void hangIfTesting() {
- // Simulate hanging task executor if enabled and is first attempt
- String shouldHang = System.getenv(Constants.TEST_TASK_EXECUTOR_HANG);
- String attempt = System.getenv(Constants.ATTEMPT_NUMBER);
- int attemptNumber = attempt == null ? 0 : Integer.valueOf(attempt);
- if (shouldHang != null && Boolean.parseBoolean(shouldHang) && attemptNumber < 1) {
- LOG.info("Hanging for 20 seconds for testing purposes");
- try {
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- LOG.error("Thread interrupted while hanging forever", e);
- }
- // We still exit after 20 seconds to prevent this process from sticking around forever.
- // In the cluster, when using cgroups, when the container for this process is killed, this process will also be
- // killed, but when using MiniYARNCluster, that's not the case, so this process still needs to exit during tests.
- System.exit(-1);
- }
- }
+ // End region TonyDataFeed
private void skewAndHangIfTesting() {
String skewInstr = System.getenv(Constants.TEST_TASK_EXECUTOR_SKEW);
@@ -349,6 +327,4 @@ private void skewAndHangIfTesting() {
}
}
}
- //endregion
-
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
index 22568b51..5fdc2bf4 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
@@ -24,11 +24,13 @@
import com.linkedin.tony.util.Utils;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileReader;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -86,8 +88,6 @@
import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
import py4j.GatewayServer;
-import static com.linkedin.tony.TonyConfigurationKeys.*;
-
public class TonyApplicationMaster {
private static final Log LOG = LogFactory.getLog(TonyApplicationMaster.class);
@@ -146,7 +146,7 @@ public class TonyApplicationMaster {
private TonySession.Builder sessionBuilder;
/** Configuration **/
- private static Configuration yarnConf;
+ private Configuration yarnConf;
private Configuration hdfsConf;
/** Cluster spec **/
@@ -176,9 +176,6 @@ public class TonyApplicationMaster {
private volatile boolean taskHasMissesHB = false;
private Thread mainThread;
- /** Handle different machine frameworks **/
- private MLFramework framework;
-
private TonyApplicationMaster() {
hdfsConf = new Configuration();
yarnConf = new Configuration();
@@ -264,8 +261,6 @@ private boolean init(String[] args) {
TonyConfigurationKeys.DEFAULT_TASK_HEARTBEAT_INTERVAL_MS);
maxConsecutiveHBMiss = tonyConf.getInt(TonyConfigurationKeys.TASK_MAX_MISSED_HEARTBEATS,
TonyConfigurationKeys.DEFAULT_TASK_MAX_MISSED_HEARTBEATS);
- framework = MLFramework.valueOf(tonyConf.get(TonyConfigurationKeys.FRAMEWORK_NAME,
- TonyConfigurationKeys.DEFAULT_FRAMEWORK_NAME).toUpperCase());
tonyHistoryFolder = tonyConf.get(TonyConfigurationKeys.TONY_HISTORY_LOCATION,
TonyConfigurationKeys.DEFAULT_TONY_HISTORY_LOCATION);
@@ -509,7 +504,7 @@ private void writeConfigFile(FileSystem fs, Path jobDir) throws IOException {
if (jobDir == null) {
return;
}
- Path configFile = new Path(jobDir,"config.xml");
+ Path configFile = new Path(jobDir, "config.xml");
try (FSDataOutputStream out = fs.create(configFile)) {
tonyConf.writeXml(out);
} catch (IOException e) {
@@ -744,9 +739,9 @@ private int doPreprocessingJob() throws Exception {
session.setFinalStatus(FinalApplicationStatus.FAILED, "Preprocessing job failed.");
return exitCode;
}
- try (BufferedReader reader = new BufferedReader(new FileReader(
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(
System.getProperty(YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR)
- + File.separatorChar + Constants.AM_STDOUT_FILENAME))) {
+ + File.separatorChar + Constants.AM_STDOUT_FILENAME), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.contains("Model parameters: ")) {
@@ -1042,7 +1037,7 @@ public void onContainersAllocated(List containers) {
+ ", containerId = " + container.getId()
+ ", containerNode = " + container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
+ ", resourceRequest = " + container.getResource());
- new ContainerLauncher(container, containerListener).run();
+ new ContainerLauncher(container).run();
}
}
@@ -1074,11 +1069,9 @@ public void onError(Throwable throwable) {
*/
private class ContainerLauncher implements Runnable {
Container container;
- NMCallbackHandler containerListener;
- ContainerLauncher(Container container, NMCallbackHandler containerListener) {
+ ContainerLauncher(Container container) {
this.container = container;
- this.containerListener = containerListener;
}
/**
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyConfigurationKeys.java b/tony-core/src/main/java/com/linkedin/tony/TonyConfigurationKeys.java
index c8aa1894..7f6c59a3 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyConfigurationKeys.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyConfigurationKeys.java
@@ -18,7 +18,7 @@ private TonyConfigurationKeys() {
// Version info configuration
public static final String TONY_VERSION_INFO_PREFIX = TONY_PREFIX + "version-info.";
- public static final String TONY_VERSION_INFO_VERSION= TONY_VERSION_INFO_PREFIX + "version";
+ public static final String TONY_VERSION_INFO_VERSION = TONY_VERSION_INFO_PREFIX + "version";
public static final String TONY_VERSION_INFO_REVISION = TONY_VERSION_INFO_PREFIX + "revision";
public static final String TONY_VERSION_INFO_BRANCH = TONY_VERSION_INFO_PREFIX + "branch";
public static final String TONY_VERSION_INFO_USER = TONY_VERSION_INFO_PREFIX + "user";
diff --git a/tony-core/src/main/java/com/linkedin/tony/io/HdfsAvroFileSplitReader.java b/tony-core/src/main/java/com/linkedin/tony/io/HdfsAvroFileSplitReader.java
index 9618464e..06ce8428 100644
--- a/tony-core/src/main/java/com/linkedin/tony/io/HdfsAvroFileSplitReader.java
+++ b/tony-core/src/main/java/com/linkedin/tony/io/HdfsAvroFileSplitReader.java
@@ -415,7 +415,7 @@ public List createReadInfo(List readPaths,
return filesToRead;
}
- private class FileAccessInfo {
+ private static class FileAccessInfo {
final String filePath;
final long startOffset;
final long readLength;
@@ -472,7 +472,7 @@ public String getSchemaJson() {
* as a byte array. Python code can simply parse this
* in memory byte array as if it is a regular avro file.
*/
- class FileObject extends OutputStream {
+ static class FileObject extends OutputStream {
ByteArrayOutputStream stream;
@@ -769,7 +769,9 @@ T poll(int time, TimeUnit unit) throws InterruptedException {
int attempt = 100;
while (list.size() < pollingThreshold * this.bufferSize
&& !fetcher.readFinished && attempt-- > 0) {
- bufferReady.await(10, TimeUnit.MILLISECONDS);
+ if (!bufferReady.await(10, TimeUnit.MILLISECONDS) && attempt % 20 == 0) {
+ LOG.warn("Read buffer not ready");
+ }
}
if (attempt <= 0) {
return null;
diff --git a/tony-core/src/main/java/com/linkedin/tony/rpc/ApplicationRpcServer.java b/tony-core/src/main/java/com/linkedin/tony/rpc/ApplicationRpcServer.java
index a40c8a52..44686b93 100644
--- a/tony-core/src/main/java/com/linkedin/tony/rpc/ApplicationRpcServer.java
+++ b/tony-core/src/main/java/com/linkedin/tony/rpc/ApplicationRpcServer.java
@@ -8,6 +8,7 @@
import com.linkedin.tony.TFPolicyProvider;
import com.linkedin.tony.rpc.impl.pb.service.TensorFlowClusterPBServiceImpl;
import java.io.IOException;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -22,8 +23,8 @@
public class ApplicationRpcServer extends Thread implements TensorFlowCluster {
- private static final RecordFactory RECORD_FACTORY =
- RecordFactoryProvider.getRecordFactory(null);
+ private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null);
+ private static final Random RANDOM_NUMBER_GENERATOR = new Random();
private final int rpcPort;
private final String rpcAddress;
private final ApplicationRpc appRpc;
@@ -33,7 +34,7 @@ public class ApplicationRpcServer extends Thread implements TensorFlowCluster {
public ApplicationRpcServer(String hostname, ApplicationRpc rpc, Configuration conf) {
this.rpcAddress = hostname;
- this.rpcPort = 10000 + ((int) (Math.random() * (5000)) + 1);
+ this.rpcPort = 10000 + RANDOM_NUMBER_GENERATOR.nextInt(5000) + 1;
this.appRpc = rpc;
this.conf = conf;
if (conf == null) {
diff --git a/tony-core/src/main/java/com/linkedin/tony/rpc/TaskUrl.java b/tony-core/src/main/java/com/linkedin/tony/rpc/TaskUrl.java
index 812f3790..0ef502ae 100644
--- a/tony-core/src/main/java/com/linkedin/tony/rpc/TaskUrl.java
+++ b/tony-core/src/main/java/com/linkedin/tony/rpc/TaskUrl.java
@@ -4,6 +4,8 @@
*/
package com.linkedin.tony.rpc;
+import java.util.Objects;
+
/**
* Contains the name, index, and URL for a task.
@@ -38,4 +40,22 @@ public int compareTo(TaskUrl other) {
}
return Integer.valueOf(this.index).compareTo(Integer.valueOf(other.index));
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskUrl taskUrl = (TaskUrl) o;
+ return Objects.equals(name, taskUrl.name) && Objects.equals(index, taskUrl.index) && Objects.equals(url,
+ taskUrl.url);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, index, url);
+ }
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
index a009b8a3..821be5a9 100644
--- a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
+++ b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
@@ -6,7 +6,6 @@
import com.google.common.base.Preconditions;
import com.linkedin.tony.Constants;
-import com.linkedin.tony.TonyConfigurationKeys;
import com.linkedin.tony.util.Utils;
import com.linkedin.tony.rpc.TaskUrl;
import java.net.URI;
@@ -21,9 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -100,8 +97,8 @@ private TonySession(Builder builder) {
this.jvmArgs = builder.jvmArgs;
this.tonyConf = builder.tonyConf;
- for (String jobName : containerRequests.keySet()) {
- jobTasks.put(jobName, new TonyTask[containerRequests.get(jobName).getNumInstances()]);
+ for (Map.Entry entry : containerRequests.entrySet()) {
+ jobTasks.put(entry.getKey(), new TonyTask[entry.getValue().getNumInstances()]);
}
}
@@ -122,8 +119,8 @@ public void setResources(Configuration yarnConf,
Map env = System.getenv();
String tonyConfPath = env.get(Constants.TONY_CONF_PREFIX + Constants.PATH_SUFFIX);
- long tonyConfTimestamp = Long.valueOf(env.get(Constants.TONY_CONF_PREFIX + Constants.TIMESTAMP_SUFFIX));
- long tonyConfLength = Long.valueOf(env.get(Constants.TONY_CONF_PREFIX + Constants.LENGTH_SUFFIX));
+ long tonyConfTimestamp = Long.parseLong(env.get(Constants.TONY_CONF_PREFIX + Constants.TIMESTAMP_SUFFIX));
+ long tonyConfLength = Long.parseLong(env.get(Constants.TONY_CONF_PREFIX + Constants.LENGTH_SUFFIX));
LocalResource tonyConfResource =
LocalResource.newInstance(ConverterUtils.getYarnUrlFromURI(URI.create(tonyConfPath)),
@@ -358,8 +355,8 @@ private TonyTask getTask(String jobName, String taskIndex) {
* Returns true if the job is "chief" or if there is no "chief" job and ("worker", "0") is passed in.
*/
public boolean isChief(String jobName, String index) {
- return jobName.equals(CHIEF_JOB_NAME) || (!jobTasks.containsKey(CHIEF_JOB_NAME) &&
- jobName.equals(WORKER_JOB_NAME) && index.equals("0"));
+ return jobName.equals(CHIEF_JOB_NAME) || (!jobTasks.containsKey(CHIEF_JOB_NAME)
+ && jobName.equals(WORKER_JOB_NAME) && index.equals("0"));
}
public TonyTask getTask(ContainerId containerId) {
diff --git a/tony-core/src/main/java/com/linkedin/tony/util/HdfsUtils.java b/tony-core/src/main/java/com/linkedin/tony/util/HdfsUtils.java
index 826e1dd9..6e50bcdb 100644
--- a/tony-core/src/main/java/com/linkedin/tony/util/HdfsUtils.java
+++ b/tony-core/src/main/java/com/linkedin/tony/util/HdfsUtils.java
@@ -8,6 +8,7 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -75,7 +76,7 @@ public static boolean pathExists(FileSystem fs, Path filePath) {
public static String contentOfHdfsFile(FileSystem fs, Path filePath) {
StringBuilder fileContent = new StringBuilder();
try (FSDataInputStream inStrm = fs.open(filePath);
- BufferedReader bufReader = new BufferedReader(new InputStreamReader(inStrm))) {
+ BufferedReader bufReader = new BufferedReader(new InputStreamReader(inStrm, StandardCharsets.UTF_8))) {
String line;
while ((line = bufReader.readLine()) != null) {
fileContent.append(line);
diff --git a/tony-core/src/main/java/com/linkedin/tony/util/Utils.java b/tony-core/src/main/java/com/linkedin/tony/util/Utils.java
index 861a220b..ab7ace5f 100644
--- a/tony-core/src/main/java/com/linkedin/tony/util/Utils.java
+++ b/tony-core/src/main/java/com/linkedin/tony/util/Utils.java
@@ -384,7 +384,7 @@ public static String constructTFConfig(String clusterSpec, String jobName, int t
ObjectMapper mapper = new ObjectMapper();
try {
Map> spec =
- mapper.readValue(clusterSpec, new TypeReference