-
Notifications
You must be signed in to change notification settings - Fork 342
Description
I am trying to use a connectable observable to allow multiple observers to watch some data and transform it. If an error occurs in the observable, I would like to use the StopOnError functionality to stop the whole pipeline. However, whenever I throw an error from the FlatMap operation in this observable, the program deadlocks in gopark
with waitReason: waitReasonChanReceive
. It seems like a channel is waiting to receive the error but there is nothing ever sent to it? I do not understand the implementation of Connectable observables well enough to know why this might be the case.
I have tried multiple ways of creating the Error like rxgo.Thrown(err)
, rxgo.FromChannel(rxgo.JustItem(rxgo.Error(err)).Observe())
, manually creating a channel and putting the error item onto it, etc.
Is this a bug? Is there some way to work around this?
To Reproduce
- Create an Observable using FromChannel()
- Create a Connectable observable by calling
FlatMap
on an observable and passing therxgo.WithPublishStrategy()
option. - Make the FlatMap operation throw an error
- Create an observer of this observable
- call
Connect()
on the observable - Try get an item from the Observer (e.g. First())
- Deadlock
Expected behavior
The program should not deadlock and the Observer should resolve to the error.