Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interval emits events at once on second subscription #413

Open
nikobarli opened this issue Nov 9, 2017 · 3 comments
Open

interval emits events at once on second subscription #413

nikobarli opened this issue Nov 9, 2017 · 3 comments

Comments

@nikobarli
Copy link

I am not sure if it is an intended behavior or not.

The following code

        auto sc = rxsc::make_new_thread();
        auto so = rx::synchronize_in_one_worker(sc);

        auto cob = rxcpp::observable<>::interval(std::chrono::seconds(1), so)
            .take(5)
            ;
        cob.subscribe([](int v) {cout << time_point_to_string(std::chrono::system_clock::now()) << " subscribe A: " << v << endl; });
        Sleep(5000);
        cob.subscribe([](int v) {cout << time_point_to_string(std::chrono::system_clock::now()) << " subscribe B: " << v << endl; });
        Sleep(5000);

outputs the followings:

2017.11.09-14.11.27.606 subscribe A: 1
2017.11.09-14.11.28.605 subscribe A: 2
2017.11.09-14.11.29.606 subscribe A: 3
2017.11.09-14.11.30.605 subscribe A: 4
2017.11.09-14.11.31.605 subscribe A: 5
2017.11.09-14.11.32.607 subscribe B: 1
2017.11.09-14.11.32.610 subscribe B: 2
2017.11.09-14.11.32.612 subscribe B: 3
2017.11.09-14.11.32.614 subscribe B: 4
2017.11.09-14.11.32.616 subscribe B: 5

The first subscription (A) has the events emitted with 1 second interval as expected. However, the second subscription (B) has the events emitted with just several milisecs in between.

Just for completion, the 'time_point_to_string' function is as follows:

std::string time_point_to_string(std::chrono::system_clock::time_point &tp)
{
    using namespace std;
    using namespace std::chrono;

    auto ttime_t = system_clock::to_time_t(tp);
    auto tp_sec = system_clock::from_time_t(ttime_t);
    milliseconds ms = duration_cast<milliseconds>(tp - tp_sec);

    std::tm * ttm = localtime(&ttime_t);

    char date_time_format[] = "%Y.%m.%d-%H.%M.%S";

    char time_str[] = "yyyy.mm.dd.HH-MM.SS.fff";

    strftime(time_str, strlen(time_str), date_time_format, ttm);

    string result(time_str);
    result.append(".");
    result.append(to_string(ms.count()));

    return result;
}

@kirkshoop
Copy link
Member

hi Thanks for the report !this is by design, but unintended..

I need to think about this a bit..

@djleach-belcan
Copy link

djleach-belcan commented Feb 17, 2018

Similarly, the code below prints the numbers 1 through 4 with 1 second in between, then immediately prints 1 through 7 before continuing to print another number every 500ms. This surprised me since the documentation for concat states that subsequent observables won't be subscribed to until the preceding ones complete.

auto stream = rxcpp::observable<>::interval(1000ms)
	.take(4)
	.concat(rxcpp::observable<>::interval(500ms));
stream.subscribe([](auto&& value){ std::cout << value << std::endl; });

This is actually quite frustrating. I'm finding it impossible to create a stream that produces values at one interval and then produces values at a different interval once the first completes. Even the following code behaves the same as the above which makes no sense to me since the second interval stream isn't even connected until the first completes.

auto first = rxcpp::observable<>::interval(1000ms).take(4);
auto second = rxcpp::observable<>::interval(500ms).take(10).publish();

first.subscribe(
	[](auto&& value)
	{
		std::cerr << "F Value: " << value << std::endl;
	},
	[](auto&&){},
	[&]
	{
		std::cerr << "First Completed" << std::endl;
		second.subscribe(
			[](auto&& value)
			{
				std::cerr << "S Value: " << value << std::endl;
			},
			[](auto&&){},
			[&]
			{
				std::cerr << "Second Completed" << std::endl;
			});
		second.connect();
	});

@zued
Copy link

zued commented Aug 20, 2019

To see rx-interval.hpp, initial.initial (the starting time point) is initialized by now() on the constructor, and that causes the problem.
The problem seems to be fixed by:

  • Initialize initial.initial by an invalid value such as time_point::min() on the constructor.
  • Use now() if initial.initial is invalid in on_subscribe().

rx-interval.hpp.txt

The workaround "wrapping the interval in defer" also worked.

auto sc = rxsc::make_new_thread();
auto so = rx::synchronize_in_one_worker(sc);

auto cob = rxcpp::observable<>::defer([so]() { return rxcpp::observable<>::interval(std::chrono::seconds(1), so); })
    .take(5)
    ;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants