Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missed element in replaysubject #404

Open
diorcety opened this issue Sep 22, 2017 · 4 comments
Open

Missed element in replaysubject #404

diorcety opened this issue Sep 22, 2017 · 4 comments

Comments

@diorcety
Copy link
Contributor

You can miss an element in replaysubject in on_next is called between line 167 and 170 of file https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
We should lock a mutex between on_next and add in replay_observer in order to avoid that.
PS: Can be difficult to reproduce, but it easy to see the issue here.
You can see a ugly fix here: diorcety@3fdfd9b

@kirkshoop
Copy link
Member

thank you for the report!

yes, you are correct values arriving during that window in subscribe are lost for that subscriber.

this is a case of tradeoffs.

The current code drops events from the middle which is unexpected. the expectation is to drop only from the front.

Using a mutex to prevent new items would block all subscriptions receiving incoming values until all new subscriptions caught up. (potentially forever in the case of slow consumers and large replay buffers and many new subscriptions)

Using a queue per subscription to buffer values that arrive before all the pending values are consumed. (potentially infinite queueing in the case of slow consumers and large replay buffers and many new subscriptions)

replay is often used to limit queueing and so a queueing solution would not be expected anymore than the current dropping of the events from the middle.

replay is a subject and therefore it is at times used to feed output values back in as input. the mutex option would potentially make this a deadlock.

perhaps another option would be to interleave replay values with live values. I expect that this violates the expectations as well.

This will require some time to check with other implementations of replay to see how they are implemented and pick the best outcome for rxcpp.

I would appreciate more opinions and options from anyone.

@diorcety
Copy link
Contributor Author

Maybe I use the wrong tool then. I'm using a subject that emit reply from network communication. The subscription of the listener can occurs after the first received reply. I'm need a tool that can ensure that all received reply will be handle (not like replaysubject here) and cached until processed.

@kirkshoop
Copy link
Member

usually Rx expressions are structured so that the request is not made until the listener has subscribed.

one way to structure the expression to allow the listener to subscribe later would be to reduce all the replies into a container, like a vector<> and then, after all the replies are complete, emit the final vector. another option is to use scan instead of reduce then the whole container would be emitted every time a reply was added (this works best with an immutable collection, not a std::vector).

@diorcety
Copy link
Contributor Author

diorcety commented Sep 28, 2017

I maybe found a difference between rxpy and rxcpp.
https://github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp#L166
https://github.com/ReactiveX/RxPY/blob/master/rx/subjects/replaysubject.py#L68

In case of rxpy if the on_completed event is received from the source, if a replay observer subscribe after this on_completed event then the replay subject replays all the on_next events and the on_complete event.

In rxcpp it only send the on_complete event.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants