Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-4000] Processing late tuples from BaseWindowedBolt results in serialization exception #7782

Open
jira-importer opened this issue Nov 10, 2023 · 5 comments

Comments

@jira-importer
Copy link
Collaborator

I am developing an Apache Storm (v2.5.0) topology that reads events from a spout (BaseRichSpout), counts the number of events in tumbling windows (BaseWindowedBolt), and prints the count (BaseRichBolt). The topology works fine, but there are some out-of-order events in my dataset. The BaseWindowedBolt provides withLateTupleStream method to route late events to a separate stream. However, when I try to process late events, I get a serialization exception:

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
    at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
    at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
    ... 6 more

I have defined my topology as below:
 
 

public class TestTopology {
    public static void main (String[] args) throws Exception {
Config config = new Config();
config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
config.registerSerialization(TupleImpl.class);
config.registerSerialization(WorkerTopologyContext.class);
config.registerSerialization(Fields.class);
LocalCluster cluster = new LocalCluster();

try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) {
Thread.sleep(50000);}
cluster.shutdown();
}

<span class="code-keyword">static</span> TopologyBuilder getTopology(){

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventSpout", new LateEventSpout());
builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
withTimestampField("time").
withLateTupleStream("lateEvents")).
shuffleGrouping("eventSpout");
builder.setBolt("latePrintBolt", new LatePrintBolt()).
shuffleGrouping("windowBolt", "lateEvents");
builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt");
return builder;
}
}

Where `LateEventSpout` is

public class LateEventSpout extends BaseRichSpout {
<span class="code-keyword">private</span> SpoutOutputCollector collector;
<span class="code-keyword">private</span> List<<span class="code-object">Long</span>> eventTimes;
<span class="code-keyword">private</span> <span class="code-object">int</span> currentTime = 0;
<span class="code-keyword">private</span> <span class="code-object">int</span> id = 1;

<span class="code-keyword">public</span> LateEventSpout () {

eventTimes = new ArrayList<>();
for (int i = 1; i<= 61; i++) {
eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
} // eventTimes = [epoch+1, epoch+2, .., epoch+61]
}

@Override
<span class="code-keyword">public</span> void open(Map<<span class="code-object">String</span>, <span class="code-object">Object</span>> conf, TopologyContext context, SpoutOutputCollector collector) {

this.collector = collector;
}

@Override
<span class="code-keyword">public</span> void nextTuple() {

int eventId = id++;
Long eventTime = eventTimes.get(currentTime++);
if (currentTime == eventTimes.size()){
currentTime = 0;
}
collector.emit(new Values(eventId, eventTime));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "time"));
}
}

And `WindowBolt` is:

public class WindowBolt extends BaseWindowedBolt {
OutputCollector collector;

@Override
<span class="code-keyword">public</span> void prepare(Map<<span class="code-object">String</span>, <span class="code-object">Object</span>> topoConf, TopologyContext context, OutputCollector collector){

this.collector = collector;
}

@Override
<span class="code-keyword">public</span> void execute(TupleWindow inputWindow) {

int sum = 0;
for (Tuple event : inputWindow.get()){
sum++;
}
collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum));
}

@Override
<span class="code-keyword">public</span> void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("start", "end", "sum"));
}
}

 
And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is similar)
If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct results
 

public class PrintBolt extends BaseRichBolt {
    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
    }
@Override
<span class="code-keyword">public</span> void execute(Tuple input) {

System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum")));
}

@Override
<span class="code-keyword">public</span> void declareOutputFields(OutputFieldsDeclarer declarer) {
}

}

Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10 

 
However, when I try to print lateEvents stream, I get the same output but on the first late event, I get the above-mentioned exception.
 
I have debugged the issue. When WindowedBoltExecutor receives a late tuple, it emits the late tuple but BoltOutputCollectorImpl  rewraps it in a new Tuple. Now, this new tuple contains WorkerTopologyContext, which is not serializable, hence the error.

 


Originally reported by jawadtahir, imported from: Processing late tuples from BaseWindowedBolt results in serialization exception
  • status: Open
  • priority: Major
  • resolution: Unresolved
  • imported: 2025-01-24
@jira-importer
Copy link
Collaborator Author

rzo1:

Does it also happen with 2.6.0 ? We updated Kyro.

@jira-importer
Copy link
Collaborator Author

JIRAUSER302864:

Hi rzo1 ,

Thank you for your response and sorry for the late response, I have been away from my machine.  

I just checked it, and the new version does not solve the problem.

 

As far as I understood, the problem is not with serialization but with the wrong implementation of the late tuple management. The input in WindowBoltExecutor is already a Tuple. The tuple contains WorkerTopologyContext, which is not serializable (some volatile attributes). Hence, the error. In my opinion, we should change the line to 

windowedOutputCollector.emit(lateTupleStream, input, input.getValues()); 

instead of 

windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); 

 

What are your thoughts on it?

 

@jira-importer
Copy link
Collaborator Author

rzo1:

Sounds valid. Do you want to submit a PR with an accompying test for it?

@jira-importer
Copy link
Collaborator Author

JIRAUSER302864:

Hi rzo1 ,

 

I have been trying to fix this bug for the past two days. Upon closer inspection, I found that it is a much bigger problem than just changing the parameters of the emit function.

We cannot change it to input.getValues() as we define only one output field. By design, it expects a tuple. However, a tuple can never be serialized due to some volatile attributes. Hence, lateTupleStream will only work when there is no serialization.

I think we need the input of original authors kosii and arunm on how to solve this bug.

@jira-importer
Copy link
Collaborator Author

rzo1:

Given the late community health and the discussion of moving to the attic earlier this year, I doubt, that there will be much traction from the original authors. If you can think of a good solution, feel free to provide a PR or send a mail to the dev@ list to discuss a proposal in more depth.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant