@@ -18,8 +18,8 @@ import (
18
18
)
19
19
20
20
var addr = flag .String ("okq-addr" , "localhost:4777" , "Location of okq instance to test" )
21
+ var noBlock = flag .Bool ("no-block" , false , "Whether to set NOBLOCK when pushing events" )
21
22
var stopCh = make (chan bool )
22
- var wg sync.WaitGroup
23
23
24
24
func randString () string {
25
25
b := make ([]byte , 16 )
@@ -52,6 +52,11 @@ func main() {
52
52
triggerJobCh <- true
53
53
}
54
54
55
+ pushFlag := okq .Normal
56
+ if * noBlock {
57
+ log .Println ("using NOBLOCK" )
58
+ pushFlag = okq .NoBlock
59
+ }
55
60
for i := 0 ; i < n ; i ++ {
56
61
go func () {
57
62
cl := okq .New (* addr )
@@ -60,55 +65,35 @@ func main() {
60
65
if err != nil {
61
66
log .Fatal (err )
62
67
}
63
- if err := cl .Push (<- queueCh , string (eventB )); err != nil {
68
+ err = cl .Push (<- queueCh , string (eventB ), pushFlag )
69
+ if err != nil {
64
70
log .Fatal (err )
65
71
}
66
72
}
67
73
}()
68
74
}
69
75
70
- ch := make (chan * okq.ConsumerEvent )
76
+ fn := func (e * okq.Event ) bool {
77
+ eventB := []byte (e .Contents )
78
+ var then time.Time
79
+ if err := then .UnmarshalBinary (eventB ); err != nil {
80
+ log .Fatal (err )
81
+ }
82
+ agg .Agg ("event" , time .Since (then ).Seconds ())
83
+ triggerJobCh <- true
84
+ return true
85
+ }
86
+
71
87
var chwg sync.WaitGroup
72
88
for i := 0 ; i < n ; i ++ {
73
89
chwg .Add (1 )
74
90
go func () {
75
91
cl := okq .New (* addr )
76
- myCh := make (chan * okq.ConsumerEvent )
77
- go func () {
78
- for a := range myCh {
79
- ch <- a
80
- }
81
- chwg .Done ()
82
- }()
83
- err := cl .Consumer (myCh , stopCh , qs ... )
92
+ err := cl .Consumer (fn , stopCh , qs ... )
84
93
if err != nil {
85
94
log .Fatalf ("got error consuming: %s" , err )
86
95
}
87
96
}()
88
97
}
89
- go func () {
90
- chwg .Wait ()
91
- close (ch )
92
- }()
93
-
94
- for i := 0 ; i < n ; i ++ {
95
- go func () {
96
- wg .Add (1 )
97
- for a := range ch {
98
- a .Ack ()
99
-
100
- eventB := []byte (a .Event .Contents )
101
- var then time.Time
102
- if err := then .UnmarshalBinary (eventB ); err != nil {
103
- log .Fatal (err )
104
- }
105
- agg .Agg ("event" , time .Since (then ).Seconds ())
106
-
107
- triggerJobCh <- true
108
- }
109
- wg .Done ()
110
- }()
111
- }
112
-
113
- select {}
98
+ chwg .Wait ()
114
99
}
0 commit comments