Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
[EAGLE-906] phase-2 nimbus.host to nimbus.seeds
Browse files Browse the repository at this point in the history
  • Loading branch information
senjaliya committed Feb 22, 2017
1 parent 6543eab commit ce016cf
Show file tree
Hide file tree
Showing 14 changed files with 31 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static enum StreamingType {
* key - nimbus for storm.
*/
@JsonProperty
private Map<String, String> deployments;
private Map<String, Object> deployments;

public String getName() {
return name;
Expand Down Expand Up @@ -73,15 +73,15 @@ public void setDescription(String description) {
this.description = description;
}

public Map<String, String> getDeployments() {
public Map<String, Object> getDeployments() {
return deployments;
}

public void setDeployments(Map<String, String> deployments) {
public void setDeployments(Map<String, Object> deployments) {
this.deployments = deployments;
}

public static final String NIMBUS_HOST = "nimbusHost";
public static final String NIMBUS_SEEDS = "nimbusSeeds";
public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public static class TopologyMeta {
public TopologyUsage usage;

public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;

}

public static class StormClusterMeta {
public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;
public String stormVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.alert.coordinator.mock;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -62,7 +63,7 @@ public TopologyMeta creatTopology() {
TopologyMeta tm = new TopologyMeta();
tm.topologyId = namePrefix + (i++);
tm.clusterId = "default-cluster";
tm.nimbusHost = "localhost";
tm.nimbusSeeds = Arrays.asList("localhost");
tm.nimbusPort = "3000";
Pair<Topology, TopologyUsage> pair = createEmptyTopology(tm.topologyId);
tm.topology = pair.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;


public class TopologyMgmtResourceImpl {
private static final IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class);

private static final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com";
private static final List<String> DEFAULT_NIMBUS_SEEDS = Arrays.asList("sandbox.hortonworks.com");
private static final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627;
private static final String STORM_JAR_PATH = "topology.stormJarPath";

Expand All @@ -55,8 +52,7 @@ public class TopologyMgmtResourceImpl {
private Map getStormConf(List<StreamingCluster> clusters, String clusterId) throws Exception {
Map<String, Object> stormConf = Utils.readStormConfig();
if (clusterId == null) {
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
stormConf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST);
stormConf.put(Config.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS);
stormConf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT);
} else {
if (clusters == null) {
Expand All @@ -69,9 +65,8 @@ private Map getStormConf(List<StreamingCluster> clusters, String clusterId) thro
} else {
throw new Exception("Fail to find cluster: " + clusterId);
}
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
stormConf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST));
stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)));
stormConf.put(Config.NIMBUS_SEEDS, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_SEEDS, DEFAULT_NIMBUS_SEEDS));
stormConf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(String.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT))));
}
return stormConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import org.apache.eagle.app.utils.DynamicJarPathFinder;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;
import org.apache.storm.trident.spout.RichSpoutBatchExecutor;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -70,8 +70,8 @@ public StormEnvironment environment() {

public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
private static final String STORM_NIMBUS_SEEDS_CONF_PATH = "application.storm.nimbusSeeds";
private static final List<String> STORM_NIMBUS_HOST_DEFAULT = Arrays.asList("localhost");
private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";

Expand All @@ -85,12 +85,12 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config
conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
conf.put(org.apache.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
conf.put(org.apache.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
List<String> nimbusSeeds = STORM_NIMBUS_HOST_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_SEEDS_CONF_PATH)) {
nimbusSeeds = environment.config().getStringList(STORM_NIMBUS_SEEDS_CONF_PATH);
LOG.info("Overriding {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,nimbusSeeds);
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
LOG.info("Using default {} = {}", STORM_NIMBUS_SEEDS_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
}
Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
Expand All @@ -99,8 +99,7 @@ private org.apache.storm.Config getStormConfig(com.typesafe.config.Config config
} else {
LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
}
// TODO: change to NIMBUS_SEEDS list in EAGLE-907
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_SEEDS, nimbusSeeds);
conf.put(org.apache.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, "org.apache.storm.security.auth.SimpleTransportPlugin");
if (config.hasPath(WORKERS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
}
},
"storm": {
"nimbusHost": "localhost"
"nimbusSeeds": ["localhost"]
"nimbusThriftPort": 6627
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}
"appId":"hadoopQueueMonitorJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers":1,

"dataSinkConfig": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
}
"appId":"hadoopQueueMonitorJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers":1,

"dataSinkConfig": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@
"mode":"LOCAL",
"workers" : 3,
"siteId" : "sandbox",
application.storm.nimbusHost=localhost
application.storm.nimbusSeeds=["localhost"],
topology.message.timeout.secs=1800
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"mode":"LOCAL",
"workers" : 3,
"siteId" : "sandbox",
application.storm.nimbusHost=localhost
application.storm.nimbusSeeds=["localhost"],

"stormConfig" : {
"mrHistoryJobSpoutTasks" : 6,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ application {
type = org.apache.eagle.app.sink.KafkaStreamSink
}
storm {
nimbusHost = "server.eagle.apache.org"
nimbusSeeds = ["server.eagle.apache.org"]
nimbusThriftPort = 6627
}
mailService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"appId":"mrRunningJob",
"mode":"LOCAL",
application.storm.nimbusHost=localhost,
application.storm.nimbusSeeds=["localhost"],
"workers" : 8,
"siteId" : "sandbox",

Expand Down
2 changes: 1 addition & 1 deletion eagle-server-assembly/src/main/conf/eagle.conf
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ application {
provider = org.apache.eagle.app.messaging.KafkaStreamProvider
}
storm {
nimbusHost = "server.eagle.apache.org"
nimbusSeeds = ["server.eagle.apache.org"]
nimbusThriftPort = 6627
}
mailService {
Expand Down
2 changes: 1 addition & 1 deletion eagle-server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ application {
provider = org.apache.eagle.app.messaging.KafkaStreamProvider
}
storm {
nimbusHost = "server.eagle.apache.org"
nimbusSeeds = ["server.eagle.apache.org"]
nimbusThriftPort = 6627
}
updateStatus: {
Expand Down

0 comments on commit ce016cf

Please sign in to comment.