feat: close producer when topic is deleted#4147
feat: close producer when topic is deleted#4147fraidev wants to merge 2 commits intofluvio-community:masterfrom
Conversation
43c2c9e to
66461d5
Compare
sehz
left a comment
There was a problem hiding this comment.
I don't think this is right approach. This should work for all client not just CLI. client should detect if topic is closed then close connection
| [dependencies] | ||
| async-channel = { workspace = true } | ||
| async-trait = { workspace = true } | ||
| async-std = { workspace = true } |
Make sense! But the producer is never what is blocking. #[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = stream.next().await {
println!("producing a value: {}", &item);
producer.send(RecordKey::NULL, item).await?;
}
Ok(())
}Maybe we can change TopicProducerPool to detect an error when the topic is deleted and also add a method to detect it, but we will also need to add something like Another solution that I had is have a method that receives a stream and returns a stream that handles it. async fn from_stream(stream: GenericStream) -> ProducerStreamBut we will need to change implementations too: #[connector(source)]
async fn start(config: CustomConfig, producer: TopicProducerPool) -> Result<()> {
let source = TestJsonSource::new(&config)?;
let mut stream = source.connect(None).await?;
while let Some(item) = producer.from_stream(stream).await {
println!("producing a value: {}", &item);
producer.send(RecordKey::NULL, item).await?;
}
Ok(())
}What do you think? |
|
Is cluster side socket already closed by the SPU? I think that's the first thing to handle for this case. In addition, if possible it would be nice for the SPU to send a message that the produce stream is being closed due to topic delete, and then close the spu side socket. |
Yeah, I think so. The SPU already send a But I think that we will have the same problem. All usage of producer seems to be:
The I think that this issue aims to be notified before the input. |
|
Stale pull request message |
Related: #3836
We already close the consumer when the topic is deleted, now we will close the producer too.
Given:
When:
Then:
Topic "my-topic" was deleted