Skip to content

Commit 115d235

Browse files
authored
Merge pull request #230 from ReactiveX/connectable
Connectable Observable (mainly)
2 parents 771d704 + 9bb30d1 commit 115d235

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1592
-306
lines changed

README.md

+183-39
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ Reactive Extensions for the Go Language
1212

1313
ReactiveX is a new, alternative way of asynchronous programming to callbacks, promises and deferred. It is about processing streams of events or items, with events being any occurrences or changes within the system. A stream of events is called an [Observable](http://reactivex.io/documentation/contract.html).
1414

15-
An operator is basically a function that defines an Observable, how and when it should emit data. The list of operators covered is available [here](README.md#supported-operators-in-rxgo).
15+
An operator is a function that defines an Observable, how and when it should emit data. The list of operators covered is available [here](README.md#supported-operators-in-rxgo).
1616

1717
## RxGo
1818

19-
The RxGo implementation is based on the [pipelines](https://blog.golang.org/pipelines) concept. In a nutshell, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
19+
The RxGo implementation is based on the concept of [pipelines](https://blog.golang.org/pipelines). In a nutshell, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
2020

2121
![](doc/rx.png)
2222

@@ -48,14 +48,16 @@ go get -u github.com/reactivex/rxgo/v2
4848
Let's create our first Observable and consume an item:
4949

5050
```go
51-
observable := rxgo.Just([]rxgo.Item{rxgo.Of("Hello, World!")})
51+
observable := rxgo.Just("Hello, World!")()
5252
ch := observable.Observe()
5353
item := <-ch
5454
fmt.Println(item.V)
5555
```
5656

5757
The `Just` operator creates an Observable from a static list of items. `Of(value)` creates an item from a given value. If we want to create an item from an error, we have to use `Error(err)`. This is a difference with the v1 that was accepting directly a value or an error without having to wrap it. What's the rationale for this change? It is to prepare RxGo for the generics feature coming (hopefully) in Go 2.
5858

59+
By the way, the `Just` operator uses currying as a syntactic sugar. This way, it accepts multiple items in the first parameter list and multiple options in the second parameter list. We'll see below how to specify options.
60+
5961
Once the Observable is created, we can observe it using `Observe()`. By default, an Observable is lazy in the sense that it emits items only once a subscription is made. `Observe()` returns a `<-chan rxgo.Item`.
6062

6163
We consumed an item from this channel and printed its value of the item using `item.V`.
@@ -97,54 +99,71 @@ In this example, we passed 3 functions:
9799
<-observable.ForEach(...)
98100
```
99101

100-
### Deep Dive
102+
### Real-World Example
101103

102-
Let's implement a more complete example. We will create an Observable from a channel and implement three operators (`Map`, `Filter` and `Reduce`):
104+
Let's say we want to implement a stream that consumes the following `Customer` structure:
105+
```go
106+
type Customer struct {
107+
ID int
108+
Name, LastName string
109+
Age int
110+
TaxNumber string
111+
}
112+
```
103113

114+
We create an producer that will emit `Customer`s to a given `chan rxgo.Item` and create an Observable from it:
104115
```go
105116
// Create the input channel
106117
ch := make(chan rxgo.Item)
107-
// Create the data producer
118+
// Data producer
108119
go producer(ch)
109120

110121
// Create an Observable
111-
observable := rxgo.FromChannel(ch).
112-
Map(func(item interface{}) (interface{}, error) {
113-
if num, ok := item.(int); ok {
114-
return num * 2, nil
115-
}
116-
return nil, errors.New("input error")
117-
}).
118-
Filter(func(item interface{}) bool {
119-
return item != 4
120-
}).
121-
Reduce(func(acc interface{}, item interface{}) (interface{}, error) {
122-
if acc == nil {
123-
return 1, nil
124-
}
125-
return acc.(int) * item.(int), nil
126-
})
127-
128-
// Observe it
129-
product := <-observable.Observe()
130-
131-
fmt.Println(product)
122+
observable := rxgo.FromChannel(ch)
132123
```
133124

134-
We started by defining a `chan rxgo.Item` that will act as an input channel. We also created a `producer` goroutine that will be in charge to produce the inputs for our Observable.
125+
Then, we need to perform the two following operations:
126+
* Filter the customers whose age is below 18
127+
* Enrich each customer with a tax number. Retrieving a tax number is done for example by an IO-bound function doing an external REST call.
135128

136-
The Observable was created using `FromChannel` operator. This is basically a wrapper on top of an existing channel. Then, we implemented 3 operators:
137-
* `Map` to double each input.
138-
* `Filter` to filter items equals to 4.
139-
* `Reduce` to perform a reduction based on the product of each item.
129+
As the enriching step is IO-bound, it might be interesting to parallelize it within a given pool of goroutines.
130+
Yet, for some reason, all the `Customer` items need to be produced sequentially based on its `ID`.
140131

141-
In the end, `Observe` returns the output channel. We consume the output using the standard `<-` operator and we print the value.
142-
143-
In this example, the Observable does produce zero or one item. This is what we would expect from a basic `Reduce` operator.
144-
145-
An Observable that produces exactly one item is a Single (e.g. `Count`, `FirstOrDefault`, etc.). An Observable that produces zero or one item is called an OptionalSingle (e.g. `Reduce`, `Max`, etc.).
132+
```go
133+
observable.
134+
Filter(func(item interface{}) bool {
135+
// Filter operation
136+
customer := item.(Customer)
137+
return customer.Age > 18
138+
}).
139+
Map(func(_ context.Context, item interface{}) (interface{}, error) {
140+
// Enrich operation
141+
customer := item.(Customer)
142+
taxNumber, err := getTaxNumber(customer)
143+
if err != nil {
144+
return nil, err
145+
}
146+
customer.TaxNumber = taxNumber
147+
return customer, nil
148+
},
149+
// Create multiple instances of the map operator
150+
rxgo.WithPool(pool),
151+
// Serialize the items emitted by their Customer.ID
152+
rxgo.Serialize(func(item interface{}) int {
153+
customer := item.(Customer)
154+
return customer.ID
155+
}), rxgo.WithBufferedChannel(1))
156+
```
146157

147-
The operators producing a Single or OptionalSingle emit an item (or no item in the case of an OptionalSingle) when the stream is closed. In this example, the action that triggers the closure of the stream is when the `input` channel will be closed (most likely by the producer).
158+
In the end, we consume the items using `ForEach()` or `Observe()` for example. `Observe()` returns a `<-chan Item`:
159+
```go
160+
for customer := range observable.Observe() {
161+
if customer.Error() {
162+
return err
163+
}
164+
fmt.Println(customer)
165+
}
166+
```
148167

149168
## Observable Types
150169

@@ -263,7 +282,132 @@ observable.Map(transform, rxgo.WithPool(32))
263282

264283
In this example, we create a pool of 32 goroutines that consume items concurrently from the same channel. If the operation is CPU-bound, we can use the `WithCPUPool()` option that creates a pool based on the number of logical CPUs.
265284

266-
## Supported Operators in RxGo
285+
### Connectable Observable
286+
287+
A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.
288+
289+
Let's create a Connectable Observable using `rxgo.WithPublishStrategy`:
290+
291+
```go
292+
ch := make(chan rxgo.Item)
293+
go func() {
294+
ch <- rxgo.Of(1)
295+
ch <- rxgo.Of(2)
296+
ch <- rxgo.Of(3)
297+
close(ch)
298+
}()
299+
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
300+
```
301+
302+
Then, we create two Observers:
303+
304+
```go
305+
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
306+
return i.(int) + 1, nil
307+
}).DoOnNext(func(i interface{}) {
308+
fmt.Printf("First observer: %d\n", i)
309+
})
310+
311+
observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
312+
return i.(int) * 2, nil
313+
}).DoOnNext(func(i interface{}) {
314+
fmt.Printf("Second observer: %d\n", i)
315+
})
316+
```
317+
318+
If `observable` was not a Connectable Observable, as `DoOnNext` creates an Observer, the source Observable would have begun emitting items. Yet, in the case of a Connectable Observable, we have to call `Connect()`:
319+
320+
```go
321+
observable.Connect()
322+
```
323+
324+
Once `Connect()` is called, the Connectable Observable begin to emit items.
325+
326+
There is another important change with a regular Observable. A Connectable Observable publishes its items. It means, all the Observers receive a copy of the items.
327+
328+
Here is an example with a regular Observable:
329+
330+
```go
331+
ch := make(chan rxgo.Item)
332+
go func() {
333+
ch <- rxgo.Of(1)
334+
ch <- rxgo.Of(2)
335+
ch <- rxgo.Of(3)
336+
close(ch)
337+
}()
338+
// Create a regular Observable
339+
observable := rxgo.FromChannel(ch)
340+
341+
// Create the first Observer
342+
observable.DoOnNext(func(i interface{}) {
343+
fmt.Printf("First observer: %d\n", i)
344+
})
345+
346+
// Create the second Observer
347+
observable.DoOnNext(func(i interface{}) {
348+
fmt.Printf("Second observer: %d\n", i)
349+
})
350+
```
351+
352+
```
353+
First observer: 1
354+
First observer: 2
355+
First observer: 3
356+
```
357+
358+
Now, with a Connectable Observable:
359+
360+
```go
361+
ch := make(chan rxgo.Item)
362+
go func() {
363+
ch <- rxgo.Of(1)
364+
ch <- rxgo.Of(2)
365+
ch <- rxgo.Of(3)
366+
close(ch)
367+
}()
368+
// Create a Connectable Observable
369+
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
370+
371+
// Create the first Observer
372+
observable.DoOnNext(func(i interface{}) {
373+
fmt.Printf("First observer: %d\n", i)
374+
})
375+
376+
// Create the second Observer
377+
observable.DoOnNext(func(i interface{}) {
378+
fmt.Printf("Second observer: %d\n", i)
379+
})
380+
381+
observable.Connect()
382+
```
383+
384+
```
385+
Second observer: 1
386+
First observer: 1
387+
First observer: 2
388+
First observer: 3
389+
Second observer: 2
390+
Second observer: 3
391+
```
392+
393+
### Observable, Single and Optional Single
394+
395+
An Iterable is an object that can be observed using `Observe(opts ...Option) <-chan Item`.
396+
397+
An Iterable can be either:
398+
* An Observable: emit 0 or multiple items
399+
* A Single: emit 1 item
400+
* An Optional Single: emit 0 or 1 item
401+
402+
## Documentation
403+
404+
### Assert API
405+
406+
How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
407+
408+
### Operator Options
409+
410+
[Operator options](doc/options.md)
267411

268412
### Creating Observables
269413
* [Create](doc/create.md) — create an Observable from scratch by calling observer methods programmatically

assert.go

+3
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ loop:
254254
if expectedError == nil {
255255
assert.Equal(t, 0, len(errs))
256256
} else {
257+
if len(errs) == 0 {
258+
assert.FailNow(t, "no error raised", "expected %v", expectedError)
259+
}
257260
assert.Equal(t, expectedError, errs[0])
258261
}
259262
}

doc/all.md

+6-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Determine whether all items emitted by an Observable meet some criteria.
99
## Example
1010

1111
```go
12-
observable := rxgo.Just([]interface{}{1, 2, 3, 4}).
12+
observable := rxgo.Just(1, 2, 3, 4)().
1313
All(func(i interface{}) bool {
1414
// Check all items are less than 10
1515
return i.(int) < 10
@@ -46,4 +46,8 @@ true
4646

4747
### WithCPUPool
4848

49-
[Detail](options.md#withcpupool)
49+
[Detail](options.md#withcpupool)
50+
51+
### WithPublishStrategy
52+
53+
[Detail](options.md#withpublishstrategy)

doc/amb.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ Given two or more source Observables, emit all of the items from only the first
1010

1111
```go
1212
observable := rxgo.Amb([]rxgo.Observable{
13-
rxgo.Just([]interface{}{1, 2, 3}),
14-
rxgo.Just([]interface{}{4, 5, 6}),
13+
rxgo.Just(1, 2, 3)(),
14+
rxgo.Just(4, 5, 6)(),
1515
})
1616
```
1717

0 commit comments

Comments
 (0)