Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip on an Interval and From with Repeat hangs #432

Open
djleach-belcan opened this issue Feb 17, 2018 · 1 comment
Open

Zip on an Interval and From with Repeat hangs #432

djleach-belcan opened this issue Feb 17, 2018 · 1 comment

Comments

@djleach-belcan
Copy link

djleach-belcan commented Feb 17, 2018

I expected the code below to print the following with a 100ms delay between lines:

Value: 1
Value: 2
Value: 1
Value: 2

Instead it just hangs. If I specify the number of times to repeat it works as expected, but I need it to repeat forever (I will replace take(4) with take_while). Am I missing something or is this a bug?

auto interval = rxcpp::observable<>::interval(std::chrono::steady_clock::now(), 100ms);
auto source = rxcpp::observable<>::from(1, 2) | rxcpp::repeat();
interval
	| rxcpp::zip([](auto&&, auto&& source)
		{
			return source;
		}, source)
	| rxcpp::take(4)
	| rxcpp::subscribe<int>([&](auto&& value)
		{
			std::cerr << "Value: " << value << std::endl;
		});

I'm compiling with Clang 4.0.1-6 on Linux.

This code segfaults:

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::from(1, 2)
	.repeat();
auto values = o1 | rxcpp::operators::zip(
	[](int v1, int v2, int v3) {
		return 100 * v1 + 10 * v2 + v3;
	},
	o2, o3);
values.
	take(3).
	subscribe(
		[](int v){printf("OnNext: %d\n", v);},
		[](){printf("OnCompleted\n");});
@kirkshoop
Copy link
Member

Hi, thanks for the report.

repeat has a stack recursion bug. it needs to schedule the subscribe.
The workaround is to add an explicit schedule for the subscribe above the repeat.

hang

auto interval = rxcpp::observable<>::interval(std::chrono::steady_clock::now(), std::chrono::milliseconds(100));
auto source = rxcpp::observable<>::from(1, 2) | rxcpp::operators::subscribe_on(rxcpp::identity_current_thread()) | rxcpp::operators::repeat();
interval
	| rxcpp::operators::zip([](long, int source)
		{
			return source;
		}, source)
	| rxcpp::operators::take(4)
	| rxcpp::operators::subscribe<int>([](int value)
		{
			std::cerr << "Value: " << value << std::endl;
		});

segfault

auto o1 = rxcpp::observable<>::interval(std::chrono::milliseconds(1));
auto o2 = rxcpp::observable<>::interval(std::chrono::milliseconds(2));
auto o3 = rxcpp::observable<>::from(1, 2)
	.subscribe_on(identity_current_thread())
	.repeat();
auto values = o1 | rxcpp::operators::zip(
	[](int v1, int v2, int v3) {
		return 100 * v1 + 10 * v2 + v3;
	},
	o2, o3);
values.
	take(3).
	subscribe(
		[](int v){printf("OnNext: %d\n", v);},
		[](){printf("OnCompleted\n");});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants