Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

[EAGLE-906] org.apache.storm 1.x with API changes #812

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.eagle.alert.engine.UnitTopologyMain;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import backtype.storm.generated.StormTopology;
import org.apache.storm.generated.StormTopology;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static enum StreamingType {
* key - nimbus for storm.
*/
@JsonProperty
private Map<String, String> deployments;
private Map<String, Object> deployments;

public String getName() {
return name;
Expand Down Expand Up @@ -73,15 +73,15 @@ public void setDescription(String description) {
this.description = description;
}

public Map<String, String> getDeployments() {
public Map<String, Object> getDeployments() {
return deployments;
}

public void setDeployments(Map<String, String> deployments) {
public void setDeployments(Map<String, Object> deployments) {
this.deployments = deployments;
}

public static final String NIMBUS_HOST = "nimbusHost";
public static final String NIMBUS_SEEDS = "nimbusSeeds";
public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.eagle.alert.engine.model;

import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import backtype.storm.tuple.Tuple;
import org.apache.storm.tuple.Tuple;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public static class TopologyMeta {
public TopologyUsage usage;

public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;

}

public static class StormClusterMeta {
public String clusterId;
public String nimbusHost;
public List<String> nimbusSeeds;
public String nimbusPort;
public String stormVersion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.alert.coordinator.mock;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -62,7 +63,7 @@ public TopologyMeta creatTopology() {
TopologyMeta tm = new TopologyMeta();
tm.topologyId = namePrefix + (i++);
tm.clusterId = "default-cluster";
tm.nimbusHost = "localhost";
tm.nimbusSeeds = Arrays.asList("localhost");
tm.nimbusPort = "3000";
Pair<Topology, TopologyUsage> pair = createEmptyTopology(tm.topologyId);
tm.topology = pair.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@

cd $(dirname $0)/../../alert-assembly/

java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient1
java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/1.0.3/storm-core-1.0.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient1

Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

cd $(dirname $0)/../../alert-assembly/

java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient2
java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/1.0.3/storm-core-1.0.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient2

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.eagle.alert.engine;

import backtype.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiCountMetric;

public class StormMultiCountMetric implements StreamCounter {
private MultiCountMetric countMetric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.eagle.alert.engine;

import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.task.TopologyContext;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.TopologyContext;
import com.typesafe.config.Config;

public class StreamContextImpl implements StreamContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import org.apache.eagle.alert.config.ZKConfigBuilder;
import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;

import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

Expand Down Expand Up @@ -70,7 +68,7 @@ public static void main(String[] args) throws Exception {
new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
}

public static void runTopology(Config config, backtype.storm.Config stormConfig) {
public static void runTopology(Config config, org.apache.storm.Config stormConfig) {
// load config and start
String topologyId = getTopologyName(config);
ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.partition.GlobalGrouping;
import org.apache.storm.trident.partition.GlobalGrouping;

import java.util.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.eagle.alert.engine.router.impl;

import backtype.storm.generated.GlobalStreamId;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.WorkerTopologyContext;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.task.WorkerTopologyContext;

import java.io.Serializable;
import java.util.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eagle.alert.engine.router.impl;

import backtype.storm.task.OutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.router.StreamOutputCollector;
import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.engine.serialization.Serializers;
import org.apache.eagle.alert.utils.AlertConstants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@

import com.typesafe.config.Config;

import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

/**
* Since 5/1/16.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.eagle.alert.engine.runner;

import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import com.typesafe.config.Config;
import org.apache.eagle.alert.coordination.model.PublishSpec;
import org.apache.eagle.alert.engine.StreamContextImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import org.apache.eagle.alert.metric.IMetricSystem;
import org.apache.eagle.alert.metric.MetricSystem;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;
import com.codahale.metrics.Gauge;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

import org.apache.eagle.alert.metric.IMetricSystem;
import org.apache.eagle.alert.metric.MetricSystem;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;
import com.codahale.metrics.Gauge;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@

import com.typesafe.config.Config;

import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;

public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider {
private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@
import org.apache.eagle.alert.utils.StreamIdConversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;

Expand Down Expand Up @@ -69,13 +67,13 @@ public class UnitTopologyRunner {
public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;

private final IMetadataChangeNotifyService metadataChangeNotifyService;
private backtype.storm.Config givenStormConfig = null;
private org.apache.storm.Config givenStormConfig = null;

public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService) {
this.metadataChangeNotifyService = metadataChangeNotifyService;
}

public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) {
public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, org.apache.storm.Config stormConfig) {
this(changeNotifyService);
this.givenStormConfig = stormConfig;
}
Expand All @@ -94,7 +92,7 @@ private void run(String topologyId,
Config config,
boolean localMode) {

backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig;
org.apache.storm.Config stormConfig = givenStormConfig == null ? new org.apache.storm.Config() : givenStormConfig;
// TODO: Configurable metric consumer instance number

int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.apache.eagle.alert.engine.scheme;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -49,12 +52,21 @@ public Fields getOutputFields() {
return new Fields("f1");
}

public static String deserializeString(ByteBuffer buffer) {
if (buffer.hasArray()) {
int base = buffer.arrayOffset();
return new String(buffer.array(), base + buffer.position(), buffer.remaining());
} else {
return new String(Utils.toByteArray(buffer), StandardCharsets.UTF_8);
}
}

@Override
@SuppressWarnings("rawtypes")
public List<Object> deserialize(byte[] ser) {
public List<Object> deserialize(ByteBuffer ser) {
try {
if (ser != null) {
Map map = mapper.readValue(ser, Map.class);
Map map = mapper.readValue(deserializeString(ser), Map.class);
return Arrays.asList(topic, map);
} else {
if (LOG.isDebugEnabled()) {
Expand All @@ -63,7 +75,7 @@ public List<Object> deserialize(byte[] ser) {
}
} catch (IOException e) {
try {
LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
LOG.error("Failed to deserialize as JSON: {}", new String(ser.array(), "UTF-8"), e);
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
}
Expand Down
Loading