Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature request] buffer with predicate which determines when buffer must be flushed #369

Open
anatoly-spb opened this issue Mar 31, 2017 · 8 comments

Comments

@anatoly-spb
Copy link
Contributor

anatoly-spb commented Mar 31, 2017

Hello!

I have simple example with character stream:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  );

I want to get std::vector for each sub sequence terminated by '$'. I know that there is buffer operator ( http://reactivex.io/documentation/operators/buffer.html ) with count and time overloads. But what about buffer with predicate which determines when buffer must be flushed?

Now I have the following workaround:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  );

  auto vector_stream = source
    | rxcpp::operators::flat_map([](char ch) {
    return rxcpp::observable<>::create<std::vector<char>>(
      [ch](rxcpp::subscriber<std::vector<char>> s) {
      static std::vector<char> a;
      if (ch == '$') {
        s.on_next(a);
        a.clear();
      }
      else {
        a.push_back(ch);
      }
    });
  });

  vector_stream
    .subscribe([](const std::vector<char> &v) {
    std::cout << "consume vector: ";
    std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
    std::cout << std::endl;
  });

I am newcomer, so may be you point me more correct and elegant RX way to solve my task.

Thanks in advance,
Anatoly Shirokov

@kirkshoop
Copy link
Member

The window_toggle operator can be used to build this. In fact this is used in Readme.md to implement a variation of this exact problem.

    // filter to last string in each line
    auto closes = strings |
        filter(
            [](const string& s){
                return s.back() == '\r';
            }) |
        Rx::map([](const string&){return 0;});

    // group strings by line
    auto linewindows = strings |
        window_toggle(closes | start_with(0), [=](int){return closes;});

Partially applied to this case:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  ) |
  // share
  publish() |
  ref_count();

    // filter to flushes
    auto flushes = source |
        filter(
            [](char c){
                return c == '$';
            }) |
        Rx::map([](const string&){return 0;});

    // group
    auto windows = source |
        window_toggle(flushes | start_with(0), [=](int){return flushes;});

  auto vectors = windows |
    map([](observable<char> w){
      return w | 
        filter(. . .) | // filter out '$'
        reduce(. . .); // push_back each char into a vector.
    });

@anatoly-spb
Copy link
Contributor Author

anatoly-spb commented Apr 3, 2017

Thank you so much, Kirk! You are kind wizard. But I cannot make it works with infinity character sequense:

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  });
  std::mutex m;
  source.subscribe([&m](char ch) {
    std::lock_guard<std::mutex> g(m);
    std::cout << "char: " << ch << std::endl;
  });

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  })
    | rxcpp::operators::map([](char ch) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) | 
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) | rxcpp::operators::as_dynamic();
  });

  vectors
    .take(3)
    .as_blocking()
    .subscribe([&m](rxcpp::observable<std::vector<char>> o) {
    o.subscribe([&m](std::vector<char> &v) {
      std::lock_guard<std::mutex> g(m);
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });
  });

I got the following:

char: B
char: C
char: D
char: E
char: F
char: G
char: H
char: I
char: J
char: $
vector: B C D E F G H I J
vector:
char: L
char: M
char: N
char: O
char: P
char: Q
char: R
char: S
char: T
char: $

Any hints?

@kirkshoop
Copy link
Member

I think that the fix is to share the source

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  }) |
  // share
  publish() |
  ref_count();

this should let you remove the mutex m as well. at the moment every subscribe to the source is starting a new thread and emitting all the chars on each thread. this leads to some coordination issues in the window_toggle operator, since it was not given a thread-safe scheduler to use to coordinate the values from different threads. It also means that the '$' from one thread is being used to close the sequence from a different thread.

@anatoly-spb
Copy link
Contributor Author

Thank you, Kirk! As the result window_toggle is not suitable to solve this task. The following does not work:

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  })
    .publish()
    .ref_count();

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  })
    | rxcpp::operators::map([](char ch) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) | 
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) | rxcpp::operators::as_dynamic();
  });

  vectors
    .take(3)
    .as_blocking()
    .subscribe([](rxcpp::observable<std::vector<char>> o) {
    o.subscribe([](std::vector<char> &v) {
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });
  });
  return;

I got only one vector and my program is hanged:

vector: B C D E F G H I J

As for me the best solution in that case will be buffer with predicate.

@kirkshoop
Copy link
Member

I finally have a fix for an async lifetime bug in window_toggle.

A few improvements to the code as well.

  • no need to use as_blocking() when no threads were added
  • use merge() instead of nesting a subscribe in on_next
  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  })
    .publish()
    .ref_count();

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  }) |
    rxcpp::operators::map([](char ) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) |
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) |
      rxcpp::operators::as_dynamic();
  }) |
  merge();

  vectors
    .take(3)
    .subscribe([](const std::vector<char> &v) {
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });

with the bug fix this results in

$ ./delimited 
vector: B C D E F G H I J 
vector: L M N O P Q R S T 
vector: V W X Y Z [ \ ] ^ 
$ 

As for me the best solution in that case will be buffer with predicate.

Adding buffer_toggle would allow this and remove the extra reduce. There is not a lot of code difference between the window and buffer forms of the operators. I just haven't gotten around to it yet.

@anatoly-spb
Copy link
Contributor Author

Thank you so much, Kirk! This is very good news.

kirkshoop pushed a commit that referenced this issue Apr 7, 2017
@kirkshoop
Copy link
Member

Thank you for pointing out the issues in window_toggle!

@tnovotny
Copy link

With all respect to the 'creative solution', readable and understandable code looks different, so something like a buffer_if with a predicate seems preferable and should still be considered a reasonable feature request. The proposed solution is more of a workaround imho.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants