Skip to content

Commit

Permalink
Support to report trace results to datadog agent (#18)
Browse files Browse the repository at this point in the history
* Support report to datadog agent

Signed-off-by: Zhenchi <[email protected]>

* format

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Dec 1, 2020
1 parent 5931332 commit 1effc1d
Show file tree
Hide file tree
Showing 15 changed files with 1,061 additions and 506 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ package main
import (
"context"
"fmt"
"github.com/tikv/minitrace-go"
"strconv"

"github.com/tikv/minitrace-go"
)

func tracedFunc(ctx context.Context, event string) {
Expand Down
108 changes: 54 additions & 54 deletions buffer_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,79 +14,79 @@
package minitrace

import (
"sync"
"sync"
)

const POW = 8

type buffer struct {
array [1 << POW]Span
next *buffer
array [1 << POW]Span
next *buffer
}

type bufferList struct {
head *buffer
tail *buffer
collected bool
len int
head *buffer
tail *buffer
collected bool
len int
}

var bufferPool = &sync.Pool{New: func() interface{} { return &buffer{} }}

func newBufferList() *bufferList {
// Fetch a help header. It doesn't store any value.
n := bufferPool.Get().(*buffer)

return &bufferList{
n,
n,
false,
0,
}
// Fetch a help header. It doesn't store any value.
n := bufferPool.Get().(*buffer)

return &bufferList{
n,
n,
false,
0,
}
}

func (bl *bufferList) slot() *Span {
idx := bl.len & ((1 << POW) - 1)
if idx == 0 {
n := bufferPool.Get().(*buffer)
idx := bl.len & ((1 << POW) - 1)
if idx == 0 {
n := bufferPool.Get().(*buffer)

bl.tail.next = n
bl.tail = n
}
bl.tail.next = n
bl.tail = n
}

bl.len += 1
return &bl.tail.array[idx]
bl.len += 1
return &bl.tail.array[idx]
}

func (bl *bufferList) collect() []Span {
if bl.collected {
return nil
} else {
bl.collected = true
}

h := bl.head.next
bufferPool.Put(bl.head)
bl.head = nil

res := make([]Span, bl.len, bl.len)

remainingLen := bl.len
sizePerBuffer := 1 << POW
for remainingLen > sizePerBuffer {
cursor := bl.len - remainingLen
copy(res[cursor:cursor+sizePerBuffer], h.array[:])
h.array = [256]Span{}
remainingLen -= sizePerBuffer
n := h.next
bufferPool.Put(h)
h = n
}

cursor := bl.len - remainingLen
copy(res[cursor:], h.array[:remainingLen])
h.array = [256]Span{}
bufferPool.Put(h)

return res
if bl.collected {
return nil
} else {
bl.collected = true
}

h := bl.head.next
bufferPool.Put(bl.head)
bl.head = nil

res := make([]Span, bl.len, bl.len)

remainingLen := bl.len
sizePerBuffer := 1 << POW
for remainingLen > sizePerBuffer {
cursor := bl.len - remainingLen
copy(res[cursor:cursor+sizePerBuffer], h.array[:])
h.array = [256]Span{}
remainingLen -= sizePerBuffer
n := h.next
bufferPool.Put(h)
h = n
}

cursor := bl.len - remainingLen
copy(res[cursor:], h.array[:remainingLen])
h.array = [256]Span{}
bufferPool.Put(h)

return res
}
56 changes: 28 additions & 28 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,65 @@
package minitrace

import (
"context"
"sync"
"time"
"context"
"sync"
"time"
)

// A span context embedded into ctx.context
type spanContext struct {
parent context.Context
parent context.Context

tracingContext *tracingContext
tracingContext *tracingContext

// A "goroutine-local" span collection
tracedSpans *localSpans
// A "goroutine-local" span collection
tracedSpans *localSpans

// Used to build parent-child relation between spans
currentSpanId uint32
// Used to build parent-child relation between spans
currentSpanId uint32

// Used to check if the new span is created at another goroutine
currentGid int64
// Used to check if the new span is created at another goroutine
currentGid int64

createUnixTimeNs uint64
createMonoTimeNs uint64
createUnixTimeNs uint64
createMonoTimeNs uint64
}

type tracingKey struct{}

var activeTracingKey = tracingKey{}

func (s spanContext) Deadline() (deadline time.Time, ok bool) {
return s.parent.Deadline()
return s.parent.Deadline()
}

func (s spanContext) Done() <-chan struct{} {
return s.parent.Done()
return s.parent.Done()
}

func (s spanContext) Err() error {
return s.parent.Err()
return s.parent.Err()
}

func (s spanContext) Value(key interface{}) interface{} {
if key == activeTracingKey {
return s
} else {
return s.parent.Value(key)
}
if key == activeTracingKey {
return s
} else {
return s.parent.Value(key)
}
}

// Represents a per goroutine buffer
type localSpans struct {
spans *bufferList
refCount int
spans *bufferList
refCount int
}

type tracingContext struct {
traceId uint64
traceId uint64

mu sync.Mutex
collectedSpans []Span
attachment interface{}
collected bool
mu sync.Mutex
collectedSpans []Span
attachment interface{}
collected bool
}
74 changes: 74 additions & 0 deletions datadog/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package datadog

import (
"fmt"
"io"
"net/http"

"github.com/tikv/minitrace-go"
"github.com/tinylib/msgp/msgp"
)

func Send(buf io.Reader, agent string) error {
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/v0.4/traces", agent), buf)
if err != nil {
return fmt.Errorf("cannot create http request: %v", err)
}

req.Header.Set("Datadog-Meta-Tracer-Version", "v1.27.0")
req.Header.Set("Content-Type", "application/msgpack")

httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
}
response, err := httpClient.Do(req)
if err != nil {
return err
}
if code := response.StatusCode; code >= 400 {
msg := make([]byte, 1000)
n, _ := response.Body.Read(msg)
response.Body.Close()
txt := http.StatusText(code)
if n > 0 {
return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
}
return fmt.Errorf("%s", txt)
}
return nil
}

func MessagePackEncode(
buf io.Writer,
serviceName string,
traceId uint64,
spanIdPrefix uint32,
spans []minitrace.Span,
) error {
spanList := miniSpansToDdSpanList(serviceName, traceId, spanIdPrefix, spans)

if _, err := buf.Write([]byte{145}); err != nil {
return err
}

if err := msgp.Encode(buf, spanList); err != nil {
return err
}

return nil
}
71 changes: 71 additions & 0 deletions datadog/datadog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package datadog

import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/tikv/minitrace-go"
)

func TestDatadog(t *testing.T) {
ctx, handle := minitrace.StartRootSpan(context.Background(), "root", 10010, nil)
handle.AddProperty("event1", "root")
handle.AddProperty("event2", "root")
var wg sync.WaitGroup

for i := 1; i < 5; i++ {
ctx, handle := minitrace.StartSpanWithContext(ctx, fmt.Sprintf("span%d", i))
handle.AddProperty("event1", fmt.Sprintf("span%d", i))
handle.AddProperty("event2", fmt.Sprintf("span%d", i))
wg.Add(1)
go func(prefix int) {
ctx, handle := minitrace.StartSpanWithContext(ctx, fmt.Sprintf("span%d", prefix))
handle.AddProperty("event1", fmt.Sprintf("span%d", prefix))
handle.AddProperty("event2", fmt.Sprintf("span%d", prefix))
for i := 0; i < 5; i++ {
wg.Add(1)
go func(prefix int) {
handle := minitrace.StartSpan(ctx, fmt.Sprintf("span%d", prefix))
handle.AddProperty("event1", fmt.Sprintf("span%d", prefix))
handle.AddProperty("event2", fmt.Sprintf("span%d", prefix))
handle.AddProperty("event3", fmt.Sprintf("span%d", prefix))
handle.Finish()
wg.Done()
}((prefix + i) * 10)
}
handle.Finish()
wg.Done()
}(i * 10)
handle.Finish()
}

wg.Wait()
spans, _ := handle.Collect()

rand.Seed(time.Now().UnixNano())

buf := bytes.NewBuffer([]byte{})
if err := MessagePackEncode(buf, "datadog-test", rand.Uint64(), rand.Uint32(), spans); err == nil {
_ = Send(buf, "127.0.0.1:8126")
} else {
t.Fatal(err)
}
}
Loading

0 comments on commit 1effc1d

Please sign in to comment.