Skip to content

Commit

Permalink
Piotr wolski/1.73 service names (#3283)
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr authored Mar 11, 2025
1 parent 67efb9f commit bae123d
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 156 deletions.
3 changes: 2 additions & 1 deletion datastreams/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
package options

type CheckpointParams struct {
PayloadSize int64
PayloadSize int64
ServiceOverride string
}
3 changes: 3 additions & 0 deletions internal/datastreams/pathway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestPathway(t *testing.T) {
assert.Equal(t, start, p.PathwayStart())
assert.Equal(t, end, p.EdgeStart())
assert.Equal(t, statsPoint{
serviceName: "service-1",
edgeTags: nil,
hash: hash1,
parentHash: 0,
Expand All @@ -48,6 +49,7 @@ func TestPathway(t *testing.T) {
edgeLatency: 0,
}, processor.in.poll(time.Second).point)
assert.Equal(t, statsPoint{
serviceName: "service-1",
edgeTags: []string{"topic:topic1"},
hash: hash2,
parentHash: hash1,
Expand All @@ -56,6 +58,7 @@ func TestPathway(t *testing.T) {
edgeLatency: middle.Sub(start).Nanoseconds(),
}, processor.in.poll(time.Second).point)
assert.Equal(t, statsPoint{
serviceName: "service-1",
edgeTags: []string{"topic:topic2"},
hash: hash3,
parentHash: hash2,
Expand Down
1 change: 0 additions & 1 deletion internal/datastreams/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ const (
// StatsPoint contains a set of statistics grouped under various aggregation keys.
type StatsPoint struct {
// These fields indicate the properties under which the stats were aggregated.
Service string // deprecated
EdgeTags []string
Hash uint64
ParentHash uint64
Expand Down
22 changes: 3 additions & 19 deletions internal/datastreams/payload_msgp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 57 additions & 36 deletions internal/datastreams/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type statsPoint struct {
pathwayLatency int64
edgeLatency int64
payloadSize int64
serviceName string
}

type statsGroup struct {
Expand Down Expand Up @@ -96,7 +97,6 @@ func (b bucket) export(timestampType TimestampType) StatsBucket {
stats = append(stats, StatsPoint{
PathwayLatency: pathwayLatency,
EdgeLatency: edgeLatency,
Service: s.service,
EdgeTags: s.edgeTags,
Hash: s.hash,
ParentHash: s.parentHash,
Expand Down Expand Up @@ -172,12 +172,17 @@ type kafkaOffset struct {
timestamp int64
}

type bucketKey struct {
serviceName string
btime int64
}

type Processor struct {
in *fastQueue
hashCache *hashCache
inKafka chan kafkaOffset
tsTypeCurrentBuckets map[int64]bucket
tsTypeOriginBuckets map[int64]bucket
tsTypeCurrentBuckets map[bucketKey]bucket
tsTypeOriginBuckets map[bucketKey]bucket
wg sync.WaitGroup
stopped uint64
stop chan struct{} // closing this channel triggers shutdown
Expand Down Expand Up @@ -205,8 +210,8 @@ func NewProcessor(statsd internal.StatsdClient, env, service, version string, ag
service = defaultServiceName
}
p := &Processor{
tsTypeCurrentBuckets: make(map[int64]bucket),
tsTypeOriginBuckets: make(map[int64]bucket),
tsTypeCurrentBuckets: make(map[bucketKey]bucket),
tsTypeOriginBuckets: make(map[bucketKey]bucket),
hashCache: newHashCache(),
in: newFastQueue(),
stopped: 1,
Expand All @@ -224,16 +229,17 @@ func NewProcessor(statsd internal.StatsdClient, env, service, version string, ag
// It gives us the start time of the time bucket in which such timestamp falls.
func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }

func (p *Processor) getBucket(btime int64, buckets map[int64]bucket) bucket {
b, ok := buckets[btime]
func (p *Processor) getBucket(btime int64, service string, buckets map[bucketKey]bucket) bucket {
k := bucketKey{serviceName: service, btime: btime}
b, ok := buckets[k]
if !ok {
b = newBucket(uint64(btime), uint64(bucketDuration.Nanoseconds()))
buckets[btime] = b
buckets[k] = b
}
return b
}
func (p *Processor) addToBuckets(point statsPoint, btime int64, buckets map[int64]bucket) {
b := p.getBucket(btime, buckets)
func (p *Processor) addToBuckets(point statsPoint, btime int64, buckets map[bucketKey]bucket) {
b := p.getBucket(btime, point.serviceName, buckets)
group, ok := b.points[point.hash]
if !ok {
group = statsGroup{
Expand Down Expand Up @@ -267,7 +273,7 @@ func (p *Processor) add(point statsPoint) {

func (p *Processor) addKafkaOffset(o kafkaOffset) {
btime := alignTs(o.timestamp, bucketDuration.Nanoseconds())
b := p.getBucket(btime, p.tsTypeCurrentBuckets)
b := p.getBucket(btime, p.service, p.tsTypeCurrentBuckets)
if o.offsetType == produceOffset {
b.latestProduceOffsets[partitionKey{
partition: o.partition,
Expand Down Expand Up @@ -392,44 +398,54 @@ func (p *Processor) reportStats() {
}
}

func (p *Processor) flushBucket(buckets map[int64]bucket, bucketStart int64, timestampType TimestampType) StatsBucket {
bucket := buckets[bucketStart]
delete(buckets, bucketStart)
func (p *Processor) flushBucket(buckets map[bucketKey]bucket, bucketKey bucketKey, timestampType TimestampType) StatsBucket {
bucket := buckets[bucketKey]
delete(buckets, bucketKey)
return bucket.export(timestampType)
}

func (p *Processor) flush(now time.Time) StatsPayload {
func (p *Processor) flush(now time.Time) map[string]StatsPayload {
nowNano := now.UnixNano()
sp := StatsPayload{
Service: p.service,
Version: p.version,
Env: p.env,
Lang: "go",
TracerVersion: version.Tag,
Stats: make([]StatsBucket, 0, len(p.tsTypeCurrentBuckets)+len(p.tsTypeOriginBuckets)),
}
for ts := range p.tsTypeCurrentBuckets {
if ts > nowNano-bucketDuration.Nanoseconds() {
payloads := make(map[string]StatsPayload)
addBucket := func(service string, bucket StatsBucket) {
payload, ok := payloads[service]
if !ok {
payload = StatsPayload{
Service: service,
Version: p.version,
Env: p.env,
Lang: "go",
TracerVersion: version.Tag,
Stats: make([]StatsBucket, 0, 1),
}
}
payload.Stats = append(payload.Stats, bucket)
payloads[service] = payload
}
for bucketKey := range p.tsTypeCurrentBuckets {
if bucketKey.btime > nowNano-bucketDuration.Nanoseconds() {
// do not flush the bucket at the current time
continue
}
sp.Stats = append(sp.Stats, p.flushBucket(p.tsTypeCurrentBuckets, ts, TimestampTypeCurrent))
addBucket(bucketKey.serviceName, p.flushBucket(p.tsTypeCurrentBuckets, bucketKey, TimestampTypeCurrent))
}
for ts := range p.tsTypeOriginBuckets {
if ts > nowNano-bucketDuration.Nanoseconds() {
for bucketKey := range p.tsTypeOriginBuckets {
if bucketKey.btime > nowNano-bucketDuration.Nanoseconds() {
// do not flush the bucket at the current time
continue
}
sp.Stats = append(sp.Stats, p.flushBucket(p.tsTypeOriginBuckets, ts, TimestampTypeOrigin))
addBucket(bucketKey.serviceName, p.flushBucket(p.tsTypeOriginBuckets, bucketKey, TimestampTypeOrigin))
}
return sp
return payloads
}

func (p *Processor) sendToAgent(payload StatsPayload) {
atomic.AddInt64(&p.stats.flushedPayloads, 1)
atomic.AddInt64(&p.stats.flushedBuckets, int64(len(payload.Stats)))
if err := p.transport.sendPipelineStats(&payload); err != nil {
atomic.AddInt64(&p.stats.flushErrors, 1)
func (p *Processor) sendToAgent(payloads map[string]StatsPayload) {
for _, payload := range payloads {
atomic.AddInt64(&p.stats.flushedPayloads, 1)
atomic.AddInt64(&p.stats.flushedBuckets, int64(len(payload.Stats)))
if err := p.transport.sendPipelineStats(&payload); err != nil {
atomic.AddInt64(&p.stats.flushErrors, 1)
}
}
}

Expand All @@ -448,12 +464,17 @@ func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options.
edgeStart = parent.EdgeStart()
parentHash = parent.GetHash()
}
service := p.service
if params.ServiceOverride != "" {
service = params.ServiceOverride
}
child := Pathway{
hash: p.hashCache.get(p.service, p.env, edgeTags, parentHash),
hash: p.hashCache.get(service, p.env, edgeTags, parentHash),
pathwayStart: pathwayStart,
edgeStart: now,
}
dropped := p.in.push(&processorInput{typ: pointTypeStats, point: statsPoint{
serviceName: service,
edgeTags: edgeTags,
parentHash: parentHash,
hash: child.hash,
Expand Down
Loading

0 comments on commit bae123d

Please sign in to comment.