Skip to content

Commit c3bed5a

Browse files
committed
Add weather example
1 parent 0d3cae4 commit c3bed5a

File tree

122 files changed

+16799
-766
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+16799
-766
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
coverage.out
33
examples/pool/worker/worker
44
examples/streaming/single-reader/ponos.code-workspace
5+
examples/weather/bin/poller
6+
examples/weather/bin/forecaster

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ flowchart LR
100100

101101
See the [pool package README](pool/README.md) for more details.
102102

103+
## Examples
104+
105+
See the [examples](examples) directory for examples of how to use the packages
106+
in this repository.
107+
108+
If you are looking for a more complete example of how to use Ponos to build a
109+
distributed system, check out the [weather](examples/weather) example.
110+
103111
## License
104112

105113
Ponos is licensed under the MIT license. See [LICENSE](LICENSE) for the full

examples/pool/worker/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func main() {
6565
}
6666

6767
// Start starts an execution.
68-
func (w *JobHandler) Start(ctx context.Context, job *pool.Job) error {
68+
func (w *JobHandler) Start(job *pool.Job) error {
6969
w.lock.Lock()
7070
defer w.lock.Unlock()
7171
exec := &Execution{c: make(chan struct{})}
@@ -75,7 +75,7 @@ func (w *JobHandler) Start(ctx context.Context, job *pool.Job) error {
7575
}
7676

7777
// Stop stops an execution.
78-
func (w *JobHandler) Stop(ctx context.Context, key string) error {
78+
func (w *JobHandler) Stop(key string) error {
7979
w.lock.Lock()
8080
defer w.lock.Unlock()
8181
exec, ok := w.executions[key]
@@ -91,7 +91,7 @@ func (w *JobHandler) Stop(ctx context.Context, key string) error {
9191
}
9292

9393
// Print notification.
94-
func (w *JobHandler) HandleNotification(ctx context.Context, key string, payload []byte) error {
94+
func (w *JobHandler) HandleNotification(key string, payload []byte) error {
9595
fmt.Printf(">> Notification: %s\n", string(payload))
9696
return nil
9797
}

examples/streaming/multi-readers/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
func main() {
@@ -21,7 +22,7 @@ func main() {
2122
}
2223

2324
// Create stream
24-
stream, err := streaming.NewStream(ctx, "multireaders", rdb)
25+
stream, err := streaming.NewStream("multireaders", rdb)
2526
if err != nil {
2627
panic(err)
2728
}
@@ -45,8 +46,8 @@ func main() {
4546
// Create reader1 for stream that reads from the beginning and
4647
// waits for events for up to 100ms
4748
reader1, err := stream.NewReader(ctx,
48-
streaming.WithReaderStartAtOldest(),
49-
streaming.WithReaderBlockDuration(100*time.Millisecond))
49+
options.WithReaderStartAtOldest(),
50+
options.WithReaderBlockDuration(100*time.Millisecond))
5051
if err != nil {
5152
panic(err)
5253
}
@@ -61,8 +62,8 @@ func main() {
6162
// Create other reader for stream and start reading after first
6263
// event
6364
reader2, err := stream.NewReader(ctx,
64-
streaming.WithReaderStartAfter(ev.ID),
65-
streaming.WithReaderBlockDuration(100*time.Millisecond))
65+
options.WithReaderStartAfter(ev.ID),
66+
options.WithReaderBlockDuration(100*time.Millisecond))
6667
if err != nil {
6768
panic(err)
6869
}

examples/streaming/multi-sinks/main.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
// NOTE: the example below does not handle errors for brevity.
@@ -22,7 +23,7 @@ func main() {
2223
}
2324

2425
// Create stream
25-
stream, err := streaming.NewStream(ctx, "multisinks", rdb)
26+
stream, err := streaming.NewStream("multisinks", rdb)
2627
if err != nil {
2728
panic(err)
2829
}
@@ -46,8 +47,8 @@ func main() {
4647
// Create sink that reads from the beginning and waits for events for up
4748
// to 100ms
4849
sink1, err := stream.NewSink(ctx, "multisinks-sink1",
49-
streaming.WithSinkStartAtOldest(),
50-
streaming.WithSinkBlockDuration(100*time.Millisecond))
50+
options.WithSinkStartAtOldest(),
51+
options.WithSinkBlockDuration(100*time.Millisecond))
5152
if err != nil {
5253
panic(err)
5354
}
@@ -64,8 +65,8 @@ func main() {
6465

6566
// Create sink and start reading after first event
6667
sink2, err := stream.NewSink(ctx, "multisinks-sink2",
67-
streaming.WithSinkStartAfter(ev.ID),
68-
streaming.WithSinkBlockDuration(100*time.Millisecond))
68+
options.WithSinkStartAfter(ev.ID),
69+
options.WithSinkBlockDuration(100*time.Millisecond))
6970
if err != nil {
7071
panic(err)
7172
}

examples/streaming/multi-streams/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
// Note: the example below does not handle errors for brevity.
@@ -22,7 +23,7 @@ func main() {
2223
}
2324

2425
// Create stream
25-
stream1, err := streaming.NewStream(ctx, "multistreams-stream1", rdb)
26+
stream1, err := streaming.NewStream("multistreams-stream1", rdb)
2627
if err != nil {
2728
panic(err)
2829
}
@@ -32,8 +33,8 @@ func main() {
3233

3334
// Create sink
3435
sink, err := stream1.NewSink(ctx, "multistreams-sink",
35-
streaming.WithSinkStartAtOldest(),
36-
streaming.WithSinkBlockDuration(100*time.Millisecond))
36+
options.WithSinkStartAtOldest(),
37+
options.WithSinkBlockDuration(100*time.Millisecond))
3738
if err != nil {
3839
panic(err)
3940
}
@@ -52,7 +53,7 @@ func main() {
5253
fmt.Printf("event 1 id: %s\n", id1)
5354

5455
// Create second stream
55-
stream2, err := streaming.NewStream(ctx, "multistreams-stream2", rdb)
56+
stream2, err := streaming.NewStream("multistreams-stream2", rdb)
5657
if err != nil {
5758
panic(err)
5859
}

examples/streaming/pub-sub/main.go

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
// NOTE: the example below does not handle errors for brevity.
@@ -20,7 +21,7 @@ func main() {
2021
}
2122

2223
// Create stream
23-
stream, err := streaming.NewStream(ctx, "pubsub-stream", rdb)
24+
stream, err := streaming.NewStream("pubsub-stream", rdb)
2425
if err != nil {
2526
panic(err)
2627
}
@@ -32,14 +33,14 @@ func main() {
3233
id1, err := stream.Add(ctx,
3334
"event 1",
3435
[]byte("payload 1"),
35-
streaming.WithTopic("my-topic"))
36+
options.WithTopic("my-topic"))
3637
if err != nil {
3738
panic(err)
3839
}
3940
fmt.Printf("event 1 id: %s\n", id1)
4041

4142
// Add a new event to topic "other-topic"
42-
id2, err := stream.Add(ctx, "event 2", []byte("payload 2"), streaming.WithTopic("other-topic"))
43+
id2, err := stream.Add(ctx, "event 2", []byte("payload 2"), options.WithTopic("other-topic"))
4344
if err != nil {
4445
panic(err)
4546
}
@@ -48,8 +49,8 @@ func main() {
4849
// Create sink that reads from the beginning and waits for events for up
4950
// to 100ms
5051
sink, err := stream.NewSink(ctx, "pubsub-sink",
51-
streaming.WithSinkStartAtOldest(),
52-
streaming.WithSinkBlockDuration(100*time.Millisecond))
52+
options.WithSinkStartAtOldest(),
53+
options.WithSinkBlockDuration(100*time.Millisecond))
5354
if err != nil {
5455
panic(err)
5556
}
@@ -74,9 +75,9 @@ func main() {
7475
// Create reader that reads from the beginning, waits for events for up
7576
// to 100ms and only reads events whose topic match the pattern "my-*"
7677
reader, err := stream.NewReader(ctx,
77-
streaming.WithReaderStartAtOldest(),
78-
streaming.WithReaderBlockDuration(100*time.Millisecond),
79-
streaming.WithReaderTopicPattern("my-*"))
78+
options.WithReaderStartAtOldest(),
79+
options.WithReaderBlockDuration(100*time.Millisecond),
80+
options.WithReaderTopicPattern("my-*"))
8081
if err != nil {
8182
panic(err)
8283
}
@@ -87,24 +88,4 @@ func main() {
8788
// Read event from topic "my-topic"
8889
event = <-reader.Subscribe()
8990
fmt.Printf("reader topic pattern: my-*, topic: %s, event: %s, payload: %s\n", event.Topic, event.EventName, event.Payload)
90-
91-
// Create reader for stream "my-stream" that reads from the beginning,
92-
// waits for events for up to 100ms and only reads events whose topic
93-
// match the given custom filter.
94-
reader2, err := stream.NewReader(ctx,
95-
streaming.WithReaderStartAtOldest(),
96-
streaming.WithReaderBlockDuration(100*time.Millisecond),
97-
streaming.WithReaderEventMatcher(func(event *streaming.Event) bool {
98-
return event.Topic == "my-topic"
99-
}))
100-
if err != nil {
101-
panic(err)
102-
}
103-
104-
// Don't forget to close the reader when done
105-
defer reader2.Close()
106-
107-
// Read event from topic "my-topic"
108-
event = <-reader2.Subscribe()
109-
fmt.Printf("reader topic filter: my-topic, topic: %s, event: %s, payload: %s\n", event.Topic, event.EventName, event.Payload)
11091
}

examples/streaming/single-reader/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
func main() {
@@ -19,7 +20,7 @@ func main() {
1920
}
2021

2122
// Create stream
22-
stream, err := streaming.NewStream(ctx, "sr-stream", rdb)
23+
stream, err := streaming.NewStream("sr-stream", rdb)
2324
if err != nil {
2425
panic(err)
2526
}
@@ -37,8 +38,8 @@ func main() {
3738
// Create reader that reads from the beginning and waits for
3839
// events for up to 100ms
3940
reader, err := stream.NewReader(ctx,
40-
streaming.WithReaderStartAtOldest(),
41-
streaming.WithReaderBlockDuration(100*time.Millisecond))
41+
options.WithReaderStartAtOldest(),
42+
options.WithReaderBlockDuration(100*time.Millisecond))
4243
if err != nil {
4344
panic(err)
4445
}

examples/streaming/single-sink/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/redis/go-redis/v9"
1010
"goa.design/ponos/streaming"
11+
"goa.design/ponos/streaming/options"
1112
)
1213

1314
func main() {
@@ -19,7 +20,7 @@ func main() {
1920
}
2021

2122
// Create stream
22-
stream, err := streaming.NewStream(ctx, "singlesink-stream", rdb)
23+
stream, err := streaming.NewStream("singlesink-stream", rdb)
2324
if err != nil {
2425
panic(err)
2526
}
@@ -37,8 +38,8 @@ func main() {
3738
// Create sink that reads from the beginning and waits for events
3839
// for up to 100ms
3940
sink, err := stream.NewSink(ctx, "singlesink-sink",
40-
streaming.WithSinkStartAtOldest(),
41-
streaming.WithSinkBlockDuration(100*time.Millisecond))
41+
options.WithSinkStartAtOldest(),
42+
options.WithSinkBlockDuration(100*time.Millisecond))
4243
if err != nil {
4344
panic(err)
4445
}

examples/weather/Procfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
forecaster: bin/forecaster
2+
poller: bin/poller

0 commit comments

Comments
 (0)