Skip to content
This repository was archived by the owner on Aug 17, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
recorder *SpanRecorder
recorderFilename string
flushFrequency time.Duration
concurrencyLevel int

optionalRecorders []tracer.SpanRecorder

Expand All @@ -62,8 +63,8 @@ type (
var (
version = "0.1.13-pre5"

testingModeFrequency = time.Second
nonTestingModeFrequency = time.Minute
testingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequencyInTestMode.Value) * time.Millisecond
nonTestingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequency.Value) * time.Millisecond
)

func WithApiKey(apiKey string) Option {
Expand Down Expand Up @@ -185,6 +186,7 @@ func NewAgent(options ...Option) (*Agent, error) {
agent.userAgent = fmt.Sprintf("scope-agent-go/%s", agent.version)
agent.panicAsFail = false
agent.failRetriesCount = 0
agent.concurrencyLevel = env.ScopeTracerDispatcherConcurrencyLevel.Value

for _, opt := range options {
opt(agent)
Expand Down
111 changes: 81 additions & 30 deletions agent/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"bytes"
"compress/gzip"
"context"
"crypto/x509"
"errors"
"fmt"
Expand All @@ -16,6 +17,7 @@ import (

"github.com/google/uuid"
"github.com/vmihailenco/msgpack"
"golang.org/x/sync/semaphore"
"gopkg.in/tomb.v2"

"go.undefinedlabs.com/scopeagent/tags"
Expand All @@ -41,9 +43,11 @@ type (
payloadSpans []PayloadSpan
payloadEvents []PayloadEvent

flushFrequency time.Duration
url string
client *http.Client
flushFrequency time.Duration
concurrencyLevel int
url string
client *http.Client
s *semaphore.Weighted

logger *log.Logger
stats *RecorderStats
Expand Down Expand Up @@ -79,9 +83,13 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
r.metadata = agent.metadata
r.logger = agent.logger
r.flushFrequency = agent.flushFrequency
r.concurrencyLevel = agent.concurrencyLevel
r.url = agent.getUrl("api/agent/ingest")
r.client = &http.Client{}
r.stats = &RecorderStats{}
r.s = semaphore.NewWeighted(int64(r.concurrencyLevel))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a fixed pool of go routines that read from a channel? Like https://gobyexample.com/worker-pools

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found easier to implement (less code modification) with a semaphore... But yeah we can do it also with two channels... Do you prefer with a channel implementation?

r.logger.Printf("recorder frequency: %v", agent.flushFrequency)
r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel)
r.t.Go(r.loop)
return r
}
Expand Down Expand Up @@ -141,48 +149,91 @@ func (r *SpanRecorder) loop() error {

// Sends the spans in the buffer to Scope
func (r *SpanRecorder) sendSpans() (error, bool) {
defer func() {
// We acquire all to ensure all previous go routines has finished before leaving this function
if r.s.Acquire(context.Background(), int64(r.concurrencyLevel)) == nil {
r.s.Release(int64(r.concurrencyLevel))
}
}()
atomic.AddInt64(&r.stats.sendSpansCalls, 1)
const batchSize = 1000

// Local mutex to synchronize go routines and avoid race conditions in lastError var
var lastErrorMutex sync.Mutex
var lastError error
getLastError := func() error {
lastErrorMutex.Lock()
defer lastErrorMutex.Unlock()
return lastError
}
setLastError := func(err error) {
lastErrorMutex.Lock()
defer lastErrorMutex.Unlock()
lastError = err
}

var shouldCancel int32

for {
spans, spMore, spTotal := r.popPayloadSpan(batchSize)
events, evMore, evTotal := r.popPayloadEvents(batchSize)

payload := map[string]interface{}{
"metadata": r.metadata,
"spans": spans,
"events": events,
tags.AgentID: r.agentId,
}
buf, err := encodePayload(payload)
// We acquire one concurrency slot, if the concurrency level has been reached, we wait until a release
err := r.s.Acquire(context.Background(), 1)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
return err, false
}
// If we had acquire then a previous go routine has finished, we check if the shouldCancel has been set from previous goroutines
if atomic.LoadInt32(&shouldCancel) > 0 {
return getLastError(), true
}

var testSpans int64
for _, span := range spans {
if isTestSpan(span) {
testSpans++
go func(sp []PayloadSpan, ev []PayloadEvent, spTotalCount, evTotalCount int) {
defer r.s.Release(1)
payload := map[string]interface{}{
"metadata": r.metadata,
"spans": sp,
"events": ev,
tags.AgentID: r.agentId,
}
buf, err := encodePayload(payload)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(sp)))
setLastError(err)
return
}
var testSpans int64
for _, span := range sp {
if isTestSpan(span) {
testSpans++
}
}
}

r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal)
statusCode, err := r.callIngest(buf)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
} else {
atomic.AddInt64(&r.stats.sendSpansOk, 1)
atomic.AddInt64(&r.stats.spansSent, int64(len(spans)))
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
}
if statusCode == 401 {
return err, true
}
lastError = err
if len(sp) == 0 && len(ev) == 0 {
r.logger.Print("sending health check")
} else {
r.logger.Printf("sending %d/%d spans with %d/%d events", len(sp), spTotalCount, len(ev), evTotalCount)
}
statusCode, err := r.callIngest(buf)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(sp)))
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
setLastError(err)
} else {
atomic.AddInt64(&r.stats.sendSpansOk, 1)
atomic.AddInt64(&r.stats.spansSent, int64(len(sp)))
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
}
if statusCode == 401 {
atomic.AddInt32(&shouldCancel, 1)
return
}

}(spans, events, spTotal, evTotal)

if !spMore && !evMore {
break
Expand Down
43 changes: 23 additions & 20 deletions env/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package env
import "go.undefinedlabs.com/scopeagent/tags"

var (
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES")
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL")
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_INSTRUMENTATION_TESTS_FRAMEWORKS_FAIL_RETRIES", "SCOPE_TESTING_FAIL_RETRIES")
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_TESTS_FRAMEWORKS_PANIC_AS_FAIL", "SCOPE_TESTING_PANIC_AS_FAIL")
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
ScopeTracerDispatcherHealthcheckFrequency = newIntEnvVar(60000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY")
ScopeTracerDispatcherHealthcheckFrequencyInTestMode = newIntEnvVar(1000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY_IN_TESTMODE")
ScopeTracerDispatcherConcurrencyLevel = newIntEnvVar(1, "SCOPE_TRACER_DISPATCHER_CONCURRENCY_LEVEL")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, we want "1" as concurrency level in Go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless we decide new defaults for this setting? that is the default in .NET too

)
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/go-errors/errors v1.0.1
github.com/gogo/protobuf v1.3.1
github.com/google/uuid v1.1.1
github.com/kr/pretty v0.1.0 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
Expand All @@ -18,10 +17,10 @@ require (
github.com/vmihailenco/msgpack v4.0.4+incompatible
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/src-d/go-git.v4 v4.13.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)
Loading