Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.

Commit 3a4a84c

Browse files
committed
recorder concurrency support using channels.
1 parent 13bbbe2 commit 3a4a84c

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

agent/recorder.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
101101
r.stats = &RecorderStats{}
102102
r.logger.Printf("recorder frequency: %v", agent.flushFrequency)
103103
r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel)
104-
105104
// start workers
106105
r.workerJobs = make(chan *workerJob, r.concurrencyLevel)
107106
r.workerResults = make(chan *workerResult, r.concurrencyLevel)
@@ -129,8 +128,18 @@ func (r *SpanRecorder) RecordSpan(span tracer.RawSpan) {
129128

130129
func (r *SpanRecorder) loop() error {
131130
defer func() {
131+
close(r.workerJobs)
132+
close(r.workerResults)
132133
r.logger.Println("recorder has been stopped.")
133134
}()
135+
136+
// start workers
137+
r.workerJobs = make(chan *workerJob, r.concurrencyLevel)
138+
r.workerResults = make(chan *workerResult, r.concurrencyLevel)
139+
for i := 0; i < r.concurrencyLevel; i++ {
140+
go r.worker(i + 1)
141+
}
142+
134143
ticker := time.NewTicker(1 * time.Second)
135144
cTime := time.Now()
136145
for {

0 commit comments

Comments
 (0)