From ce016cf22e6ad17701954cf95e97e970af6e1e99 Mon Sep 17 00:00:00 2001 From: Jay Date: Tue, 21 Feb 2017 22:55:00 -0800 Subject: [PATCH] [EAGLE-906] phase-2 nimbus.host to nimbus.seeds --- .../engine/coordinator/StreamingCluster.java | 8 ++++---- .../coordinator/TopologyMgmtService.java | 4 ++-- .../mock/TestTopologyMgmtService.java | 3 ++- .../impl/TopologyMgmtResourceImpl.java | 15 +++++---------- .../impl/StormExecutionRuntime.java | 19 +++++++++---------- .../src/test/resources/application.conf | 2 +- .../src/main/resources/application.conf | 2 +- .../src/test/resources/application.conf | 2 +- .../src/main/resources/application.conf | 2 +- .../src/main/resources/application.conf | 2 +- .../src/test/resources/application-test.conf | 2 +- .../src/main/resources/application.conf | 2 +- .../src/main/conf/eagle.conf | 2 +- .../src/main/resources/application.conf | 2 +- 14 files changed, 31 insertions(+), 36 deletions(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java index 1e40309215..62eafc048c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java @@ -39,7 +39,7 @@ public static enum StreamingType { * key - nimbus for storm. */ @JsonProperty - private Map deployments; + private Map deployments; public String getName() { return name; @@ -73,15 +73,15 @@ public void setDescription(String description) { this.description = description; } - public Map getDeployments() { + public Map getDeployments() { return deployments; } - public void setDeployments(Map deployments) { + public void setDeployments(Map deployments) { this.deployments = deployments; } - public static final String NIMBUS_HOST = "nimbusHost"; + public static final String NIMBUS_SEEDS = "nimbusSeeds"; public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort"; } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java index 4ca9d5ecd4..c54c55b7af 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java @@ -38,14 +38,14 @@ public static class TopologyMeta { public TopologyUsage usage; public String clusterId; - public String nimbusHost; + public List nimbusSeeds; public String nimbusPort; } public static class StormClusterMeta { public String clusterId; - public String nimbusHost; + public List nimbusSeeds; public String nimbusPort; public String stormVersion; } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java index 80990a56a3..e32a7b81c5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java @@ -16,6 +16,7 @@ */ package org.apache.alert.coordinator.mock; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.tuple.Pair; @@ -62,7 +63,7 @@ public TopologyMeta creatTopology() { TopologyMeta tm = new TopologyMeta(); tm.topologyId = namePrefix + (i++); tm.clusterId = "default-cluster"; - tm.nimbusHost = "localhost"; + tm.nimbusSeeds = Arrays.asList("localhost"); tm.nimbusPort = "3000"; Pair pair = createEmptyTopology(tm.topologyId); tm.topology = pair.getLeft(); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java index 6eaae7b800..c68edef69a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java @@ -35,10 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; public class TopologyMgmtResourceImpl { @@ -46,7 +43,7 @@ public class TopologyMgmtResourceImpl { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class); - private static final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com"; + private static final List DEFAULT_NIMBUS_SEEDS = Arrays.asList("sandbox.hortonworks.com"); private static final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627; private static final String STORM_JAR_PATH = "topology.stormJarPath"; @@ -55,8 +52,7 @@ public class TopologyMgmtResourceImpl { private Map getStormConf(List clusters, String clusterId) throws Exception { Map stormConf = Utils.readStormConfig(); if (clusterId == null) { - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - stormConf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST); + stormConf.put(Config.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS); stormConf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT); } else { if (clusters == null) { @@ -69,9 +65,8 @@ private Map getStormConf(List clusters, String clusterId) thro } else { throw new Exception("Fail to find cluster: " + clusterId); } - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - stormConf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST)); - stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT))); + stormConf.put(Config.NIMBUS_SEEDS, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS)); + stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(String.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)))); } return stormConf; } diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java index 7288700383..fa2d62a6ab 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java @@ -30,12 +30,12 @@ import org.apache.eagle.app.utils.DynamicJarPathFinder; import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.storm.thrift.TException; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Int; import org.apache.storm.trident.spout.RichSpoutBatchExecutor; +import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -70,8 +70,8 @@ public StormEnvironment environment() { public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; - private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost"; - private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost"; + private static final String STORM_NIMBUS_SEEDS_CONF_PATH = "application.storm.nimbusSeeds"; + private static final List STORM_NIMBUS_HOST_DEFAULT = Arrays.asList("localhost"); private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627; private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort"; @@ -85,12 +85,12 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384)); conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384)); conf.put(org.apache.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000)); - String nimbusHost = STORM_NIMBUS_HOST_DEFAULT; - if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) { - nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH); - LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost); + List nimbusSeeds = STORM_NIMBUS_HOST_DEFAULT; + if (environment.config().hasPath(STORM_NIMBUS_SEEDS_CONF_PATH)) { + nimbusSeeds = environment.config().getStringList(STORM_NIMBUS_SEEDS_CONF_PATH); + LOG.info("Overriding {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,nimbusSeeds); } else { - LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT); + LOG.info("Using default {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT); } Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT; if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) { @@ -99,8 +99,7 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config } else { LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT); } - // TODO: change to NIMBUS_SEEDS list in EAGLE-907 - conf.put(Config.NIMBUS_HOST, nimbusHost); + conf.put(Config.NIMBUS_SEEDS, nimbusSeeds); conf.put(org.apache.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort); conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "org.apache.storm.security.auth.SimpleTransportPlugin"); if (config.hasPath(WORKERS)) { diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf index 0c9ba513fa..6a4c6fd225 100644 --- a/eagle-examples/eagle-app-example/src/test/resources/application.conf +++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf @@ -51,7 +51,7 @@ } }, "storm": { - "nimbusHost": "localhost" + "nimbusSeeds": ["localhost"] "nimbusThriftPort": 6627 } }, diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf index 9d690846b2..9db6edf3b9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf @@ -31,7 +31,7 @@ } "appId":"hadoopQueueMonitorJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers":1, "dataSinkConfig": { diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf index 9d690846b2..9db6edf3b9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf @@ -31,7 +31,7 @@ } "appId":"hadoopQueueMonitorJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers":1, "dataSinkConfig": { diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf index 0421f0960b..9d5acc17da 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/resources/application.conf @@ -48,6 +48,6 @@ "mode":"LOCAL", "workers" : 3, "siteId" : "sandbox", - application.storm.nimbusHost=localhost + application.storm.nimbusSeeds=["localhost"], topology.message.timeout.secs=1800 } \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index 3836e3acd8..c2e6ef6210 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -18,7 +18,7 @@ "mode":"LOCAL", "workers" : 3, "siteId" : "sandbox", - application.storm.nimbusHost=localhost + application.storm.nimbusSeeds=["localhost"], "stormConfig" : { "mrHistoryJobSpoutTasks" : 6, diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf index 057e189bd0..13f6d6c97a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application-test.conf @@ -34,7 +34,7 @@ application { type = org.apache.eagle.app.sink.KafkaStreamSink } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } mailService { diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf index 6d1be06667..03f3b4ff49 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf @@ -16,7 +16,7 @@ { "appId":"mrRunningJob", "mode":"LOCAL", - application.storm.nimbusHost=localhost, + application.storm.nimbusSeeds=["localhost"], "workers" : 8, "siteId" : "sandbox", diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index a889914f29..a3cd623182 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -87,7 +87,7 @@ application { provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } mailService { diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf index d657f54cc3..96370f743b 100644 --- a/eagle-server/src/main/resources/application.conf +++ b/eagle-server/src/main/resources/application.conf @@ -90,7 +90,7 @@ application { provider = org.apache.eagle.app.messaging.KafkaStreamProvider } storm { - nimbusHost = "server.eagle.apache.org" + nimbusSeeds = ["server.eagle.apache.org"] nimbusThriftPort = 6627 } updateStatus: {