Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take causes memory leak #452

Open
simonvpe opened this issue Aug 1, 2018 · 4 comments
Open

Take causes memory leak #452

simonvpe opened this issue Aug 1, 2018 · 4 comments

Comments

@simonvpe
Copy link

simonvpe commented Aug 1, 2018

There is a circular dependency here. I'll try to figure out how to resolve it myself I presume. It would be a good idea to run the tests with valgrind or sanitizers though, since a lot of issues are about memory leaks.

    template<class Subscriber>
    void on_subscribe(const Subscriber& s) const {

        typedef Subscriber output_type;
        struct state_type
            : public std::enable_shared_from_this<state_type>
            , public values
        {
            state_type(const values& i, const output_type& oarg)
                : values(i)
                , mode_value(mode::taking)
                , out(oarg)
            {
            }
            typename mode::type mode_value;
            output_type out;
        };
        // take a copy of the values for each subscription
        auto state = std::make_shared<state_type>(initial, s);

        composite_subscription source_lifetime;

        s.add(source_lifetime);

        state->source.subscribe(
        // split subscription lifetime
            source_lifetime,
        // on_next
            [state, source_lifetime](T t) {
                if (state->mode_value < mode::triggered) {
                    if (--state->count > 0) {
                        state->out.on_next(t);
                    } else {
                        state->mode_value = mode::triggered;
                        state->out.on_next(t);
                        // must shutdown source before signaling completion
                        source_lifetime.unsubscribe();
                        state->out.on_completed();
                    }
                }
            },
        // on_error
            [state](std::exception_ptr e) {
                state->mode_value = mode::errored;
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                state->mode_value = mode::stopped;
                state->out.on_completed();
            }
        );
    }
@kirkshoop
Copy link
Member

Yes, there are many circular references in rxcpp. These are broken by the unsubscribe signal. Is there an expression that actually demonstrates a leak?

@simonvpe
Copy link
Author

simonvpe commented Aug 2, 2018

After producing a MCVE I'm not sure it is take that is the problem anymore.

#include <rxcpp/rx.hpp>
#include <iostream>

struct foo : public rxcpp::sources::source_base<int> {
  static int refcount;

  explicit foo() { std::cout << "++refcount = " << ++refcount << '\n'; }
  ~foo() { std::cout << "--refcount = " << --refcount << '\n'; }
  foo(const foo &other) { std::cout << "++refcount = " << ++refcount << '\n'; }
  foo(foo &&) { std::cout << "++refcount = " << ++refcount << '\n'; }
  foo &operator=(foo &&) = default;
  foo &operator=(const foo &) = default;

  template <typename Subscriber>
  void on_subscribe(Subscriber subscriber) const {
    for (auto i = 0; i < 100; ++i) {
      subscriber.on_next(i);
    }
    subscriber.on_completed();
  }
};

auto create_foo_observable() {
  return rxcpp::observable<int, foo>{foo{}}
      .lift<int>([](rxcpp::subscriber<int> subscriber) {
        subscriber.get_subscription().add([] {
          std::cout << "Unsubscribed (refcount = " << foo::refcount << ")\n";
        });
        return subscriber;
      })
      .as_dynamic();
}

int foo::refcount = 0;

int main() {
  {
    create_foo_observable()
        .observe_on(rxcpp::observe_on_new_thread())
        .take(10)
        .as_blocking()
        .subscribe([](auto) {});
  }
  std::cout << "Final refcount is " << foo::refcount << '\n';
}
++refcount = 1
++refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
++refcount = 4
++refcount = 5
++refcount = 6
++refcount = 7
--refcount = 6
--refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
--refcount = 1
Unsubscribed (refcount = 1)
Final refcount is 1

If I remove .observe_on(rxcpp::observe_on_new_thread()) the final refcount is 0. Maybe this is not how you are supposed to create your own observables? In my real code I start up an async process using boost::asio that calls the methods on the subscriber, but the general boilerplate is about the same.

@simonvpe
Copy link
Author

simonvpe commented Aug 2, 2018

If I use rxcpp::observable<>::create() the result is more pleasing however.

#include <rxcpp/rx.hpp>

struct bar {
  static int refcount;

  explicit bar() { std::cout << "++refcount = " << ++refcount << '\n'; }
  ~bar() { std::cout << "--refcount = " << --refcount << '\n'; }
  bar(const bar &other) { std::cout << "++refcount = " << ++refcount << '\n'; }
  bar(bar &&) { std::cout << "++refcount = " << ++refcount << '\n'; }
  bar &operator=(bar &&) = default;
  bar &operator=(const bar &) = default;
};

int bar::refcount = 0;

int main() {
  {
    rxcpp::observable<>::create<int>([state = bar{}](auto subscriber) {
      for (auto i = 0; i < 100; ++i) {
        subscriber.on_next(i);
      }
      subscriber.on_completed();
    })
        .observe_on(rxcpp::observe_on_new_thread())
        .take(10)
        .as_blocking()
        .subscribe([](auto) {});
  }

  // <- Need to wait for cleanup or `bar::refcount == 1`
  using namespace std::chrono_literals;
  std::this_thread::sleep_for(100ms);

  std::cout << "Final refcount is " << bar::refcount << '\n';
}
++refcount = 1
++refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
++refcount = 3
++refcount = 4
++refcount = 5
--refcount = 4
--refcount = 3
++refcount = 4
++refcount = 5
++refcount = 6
--refcount = 5
++refcount = 6
--refcount = 5
--refcount = 4
++refcount = 5
++refcount = 6
--refcount = 5
++refcount = 6
--refcount = 5
--refcount = 4
--refcount = 3
--refcount = 2
--refcount = 1
--refcount = 0
Final refcount is 0

@kirkshoop
Copy link
Member

I expect the the first will get to zero if the sleep in the second is added before printing the final refcount.

The refcount waits for the observe_on to shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants