Skip to content

Commit 4311196

Browse files
committed
docs: refactor README and add producer-consumer implementation examples
- Update README title from "Example with server and client" to "Producer-Consumer" - Replace server and client build instructions with detailed producer and consumer implementation examples - Demonstrate setting up a Redis worker and queue for task management fix golang-queue/queue#115 Signed-off-by: appleboy <[email protected]>
1 parent a365dbd commit 4311196

File tree

1 file changed

+140
-17
lines changed

1 file changed

+140
-17
lines changed

_example/producer-consumer/README.md

Lines changed: 140 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,152 @@
1-
# Example with server and client
1+
# Producer-Consumer
22

3-
Please refer the following steps to build server and client.
3+
This example demonstrates how to use the `Producer` and `Consumer` classes to create a simple producer-consumer system.
44

5-
## Build server
5+
## Producer
66

7-
```sh
8-
go build -o app server/main.go
9-
```
7+
The producer is responsible for creating tasks and adding them to the queue. Below is an example of a producer implementation:
108

11-
## Build client
9+
```go
10+
package main
1211

13-
```sh
14-
go build -o agent client/main.go
15-
```
12+
import (
13+
"encoding/json"
14+
"fmt"
15+
"log"
16+
"time"
17+
18+
"github.com/golang-queue/queue"
19+
"github.com/golang-queue/redisdb-stream"
20+
)
21+
22+
type job struct {
23+
Message string
24+
}
25+
26+
func (j *job) Bytes() []byte {
27+
b, err := json.Marshal(j)
28+
if err != nil {
29+
panic(err)
30+
}
31+
return b
32+
}
1633

17-
## Usage
34+
func main() {
35+
taskN := 5
1836

19-
Run the multiple agent. (open two console in the same terminal)
37+
// define the worker
38+
w := redisdb.NewWorker(
39+
redisdb.WithAddr("127.0.0.1:6379"),
40+
redisdb.WithStreamName("foobar"),
41+
)
2042

21-
```sh
22-
./agent
43+
// define the queue
44+
q := queue.NewPool(
45+
0,
46+
queue.WithWorker(w),
47+
)
48+
49+
// assign tasks in queue
50+
for i := 0; i < taskN; i++ {
51+
go func(i int) {
52+
if err := q.Queue(&job{
53+
Message: fmt.Sprintf("handle the job: %d", i+1),
54+
}); err != nil {
55+
log.Fatal(err)
56+
}
57+
}(i)
58+
}
59+
60+
time.Sleep(2 * time.Second)
61+
// shutdown the service and notify all the worker
62+
q.Release()
63+
}
2364
```
2465

25-
Publish the message.
66+
## Consumer
67+
68+
The consumer is responsible for processing the tasks. Below is an example of a consumer implementation:
69+
70+
```go
71+
package main
72+
73+
import (
74+
"context"
75+
"encoding/json"
76+
"fmt"
77+
"time"
2678

27-
```sh
28-
./app
79+
"github.com/appleboy/graceful"
80+
"github.com/golang-queue/queue"
81+
"github.com/golang-queue/queue/core"
82+
"github.com/golang-queue/redisdb-stream"
83+
)
84+
85+
type job struct {
86+
Message string
87+
}
88+
89+
func (j *job) Bytes() []byte {
90+
b, err := json.Marshal(j)
91+
if err != nil {
92+
panic(err)
93+
}
94+
return b
95+
}
96+
97+
func main() {
98+
taskN := 10000
99+
rets := make(chan string, taskN)
100+
101+
m := graceful.NewManager()
102+
103+
// define the worker
104+
w := redisdb.NewWorker(
105+
redisdb.WithAddr("127.0.0.1:6379"),
106+
redisdb.WithStreamName("foobar"),
107+
redisdb.WithGroup("foobar"),
108+
redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
109+
var v *job
110+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
111+
return err
112+
}
113+
rets <- v.Message
114+
time.Sleep(2 * time.Second)
115+
return nil
116+
}),
117+
)
118+
119+
// define the queue
120+
q := queue.NewPool(
121+
1,
122+
queue.WithWorker(w),
123+
)
124+
125+
m.AddRunningJob(func(ctx context.Context) error {
126+
for {
127+
select {
128+
case <-ctx.Done():
129+
select {
130+
case m := <-rets:
131+
fmt.Println("message:", m)
132+
default:
133+
}
134+
return nil
135+
case m := <-rets:
136+
fmt.Println("message:", m)
137+
time.Sleep(50 * time.Millisecond)
138+
}
139+
}
140+
})
141+
142+
m.AddShutdownJob(func() error {
143+
// shutdown the service and notify all the worker
144+
q.Release()
145+
return nil
146+
})
147+
148+
<-m.Done()
149+
}
29150
```
151+
152+
This example demonstrates how to set up a producer-consumer system using the `Producer` and `Consumer` classes. The producer creates tasks and adds them to the queue, while the consumer processes the tasks from the queue.

0 commit comments

Comments
 (0)