Skip to content

Commit

Permalink
fix timestamp convert float64 to int64 (dmachard#285)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard authored Apr 22, 2023
1 parent 1d7e40e commit e57857f
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 56 deletions.
2 changes: 1 addition & 1 deletion collectors/dns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (d *DnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
subprocessors.InitDnsMessageFormat(&dm)

// compute timestamp
dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec))
dm.DnsTap.Timestamp = ts.UnixNano()
dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano)

// decode the dns payload
Expand Down
2 changes: 1 addition & 1 deletion collectors/dnstap_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (d *DnstapProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
}

// compute timestamp
dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec))
dm.DnsTap.Timestamp = ts.UnixNano()
dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano)

// decode the dns payload to get id, rcode and the number of question
Expand Down
2 changes: 1 addition & 1 deletion collectors/file_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ func (c *Tail) Run() {
}

// compute timestamp
dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec))
dm.DnsTap.Timestamp = ts.UnixNano()
dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano)

// fake dns packet
Expand Down
2 changes: 1 addition & 1 deletion collectors/powerdns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (d *PdnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
}

// compute timestamp
dm.DnsTap.Timestamp = float64(dm.DnsTap.TimeSec) + float64(dm.DnsTap.TimeNsec)/1e9
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec))
dm.DnsTap.Timestamp = ts.UnixNano()
dm.DnsTap.TimestampRFC3339 = ts.UTC().Format(time.RFC3339Nano)

dm.DNS.Qname = pbdm.Question.GetQName()
Expand Down
14 changes: 5 additions & 9 deletions dnsutils/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type DnsTap struct {
Identity string `json:"identity" msgpack:"identity"`
Version string `json:"version" msgpack:"version"`
TimestampRFC3339 string `json:"timestamp-rfc3339ns" msgpack:"timestamp-rfc3339ns"`
Timestamp float64 `json:"-" msgpack:"-"`
Timestamp int64 `json:"-" msgpack:"-"`
TimeSec int `json:"-" msgpack:"-"`
TimeNsec int `json:"-" msgpack:"-"`
Latency float64 `json:"-" msgpack:"-"`
Expand Down Expand Up @@ -219,7 +219,6 @@ func (dm *DnsMessage) Init() {
Z: 0,
Options: []DnsOption{},
}

}

func (dm *DnsMessage) handleGeoIPDirectives(directives []string, s *bytes.Buffer) {
Expand Down Expand Up @@ -383,16 +382,14 @@ func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundar
s.WriteString(strconv.Itoa(len(dm.DNS.DnsRRs.Answers)))
case directive == "id":
s.WriteString(strconv.Itoa(dm.DNS.Id))
case directive == "timestamp": // keep it just for backward compatibility
s.WriteString(dm.DnsTap.TimestampRFC3339)
case directive == "timestamp-rfc3339ns":
case directive == "timestamp-rfc3339ns", directive == "timestamp":
s.WriteString(dm.DnsTap.TimestampRFC3339)
case directive == "timestamp-unixms":
s.WriteString(fmt.Sprintf("%.3f", dm.DnsTap.Timestamp))
s.WriteString(fmt.Sprintf("%d", dm.DnsTap.Timestamp/1000000))
case directive == "timestamp-unixus":
s.WriteString(fmt.Sprintf("%.6f", dm.DnsTap.Timestamp))
s.WriteString(fmt.Sprintf("%d", dm.DnsTap.Timestamp/1000))
case directive == "timestamp-unixns":
s.WriteString(fmt.Sprintf("%.9f", dm.DnsTap.Timestamp))
s.WriteString(fmt.Sprintf("%d", dm.DnsTap.Timestamp))
case directive == "localtime":
ts := time.Unix(int64(dm.DnsTap.TimeSec), int64(dm.DnsTap.TimeNsec))
s.WriteString(ts.Format("2006-01-02 15:04:05.999999999"))
Expand Down Expand Up @@ -433,7 +430,6 @@ func (dm *DnsMessage) Bytes(format []string, fieldDelimiter string, fieldBoundar
case directive == "latency":
s.WriteString(dm.DnsTap.LatencySec)
case directive == "malformed":
//s.WriteString(strconv.Itoa(dm.DNS.MalformedPacket))
if dm.DNS.MalformedPacket {
s.WriteString("PKTERR")
} else {
Expand Down
172 changes: 140 additions & 32 deletions dnsutils/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,157 @@ import (
"testing"
)

func TestDnsMessage_ToString(t *testing.T) {
config := GetFakeConfig()

dm := DnsMessage{}
dm.Init()
func TestDnsMessage_TextFormat_ToString(t *testing.T) {

line := dm.String(strings.Fields(config.Global.TextFormat),
config.Global.TextFormatDelimiter,
config.Global.TextFormatBoundary)
config := GetFakeConfig()

if line != "- - - - - - - - 0b - - -" {
t.Errorf("text dns message invalid; %s", line)
testcases := []struct {
name string
delimiter string
boundary string
format string
qname string
expected string
}{
{
name: "default",
delimiter: config.Global.TextFormatDelimiter,
boundary: config.Global.TextFormatBoundary,
format: config.Global.TextFormat,
qname: "dnscollector.fr",
expected: "- - - - - - - - 0b dnscollector.fr - -",
},
{
name: "custom_delimiter",
delimiter: ";",
boundary: config.Global.TextFormatBoundary,
format: config.Global.TextFormat,
qname: "dnscollector.fr",
expected: "-;-;-;-;-;-;-;-;0b;dnscollector.fr;-;-",
},
{
name: "qname_quote",
delimiter: config.Global.TextFormatDelimiter,
boundary: config.Global.TextFormatBoundary,
format: config.Global.TextFormat,
qname: "dns collector.fr",
expected: "- - - - - - - - 0b \"dns collector.fr\" - -",
},
{
name: "default_boundary",
delimiter: config.Global.TextFormatDelimiter,
boundary: config.Global.TextFormatBoundary,
format: config.Global.TextFormat,
qname: "dns\"coll tor\".fr",
expected: "- - - - - - - - 0b \"dns\\\"coll tor\\\".fr\" - -",
},
{
name: "custom_boundary",
delimiter: config.Global.TextFormatDelimiter,
boundary: "!",
format: config.Global.TextFormat,
qname: "dnscoll tor.fr",
expected: "- - - - - - - - 0b !dnscoll tor.fr! - -",
},
}
}

func TestDnsMessage_TextDelimiter(t *testing.T) {
config := GetFakeConfig()
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
dm := DnsMessage{}
dm.Init()

dm := DnsMessage{}
dm.Init()
dm.DNS.Qname = tc.qname

line := dm.String(strings.Fields(config.Global.TextFormat),
";",
config.Global.TextFormatBoundary)

if line != "-;-;-;-;-;-;-;-;0b;-;-;-" {
t.Errorf("text dns message invalid; %s", line)
line := dm.String(strings.Fields(tc.format), tc.delimiter, tc.boundary)
if line != tc.expected {
t.Errorf("Want: %s, got: %s", tc.expected, line)
}
})
}
}

func TestDnsMessage_TextBoundary(t *testing.T) {
func TestDnsMessage_TextFormat_DefaultDirectives(t *testing.T) {
config := GetFakeConfig()

dm := DnsMessage{}
dm.Init()

dm.DNS.Qname = "dns \"collector"

line := dm.String(strings.Fields(config.Global.TextFormat),
config.Global.TextFormatDelimiter,
config.Global.TextFormatBoundary)
testcases := []struct {
name string
format string
dm DnsMessage
expected string
}{
{
format: "timestamp-rfc3339ns timestamp",
dm: DnsMessage{DnsTap: DnsTap{TimestampRFC3339: "2023-04-22T09:17:02.906922231Z"}},
expected: "2023-04-22T09:17:02.906922231Z 2023-04-22T09:17:02.906922231Z",
},
{
format: "timestamp-unixns timestamp-unixus timestamp-unixms",
dm: DnsMessage{DnsTap: DnsTap{Timestamp: 1682152174001850960}},
expected: "1682152174001850960 1682152174001850 1682152174001",
},
{
format: "latency",
dm: DnsMessage{DnsTap: DnsTap{LatencySec: "0.00001"}},
expected: "0.00001",
},
{
format: "qname qtype opcode",
dm: DnsMessage{DNS: Dns{Qname: "dnscollector.fr", Qtype: "AAAA", Opcode: 42}},
expected: "dnscollector.fr AAAA 42",
},
{
format: "operation",
dm: DnsMessage{DnsTap: DnsTap{Operation: "CLIENT_QUERY"}},
expected: "CLIENT_QUERY",
},
{
format: "family protocol",
dm: DnsMessage{NetworkInfo: DnsNetInfo{Family: "IPv4", Protocol: "UDP"}},
expected: "IPv4 UDP",
},
{
format: "length",
dm: DnsMessage{DNS: Dns{Length: 42}},
expected: "42b",
},
{
format: "malformed",
dm: DnsMessage{DNS: Dns{MalformedPacket: true}},
expected: "PKTERR",
},
{
format: "tc aa ra ad",
dm: DnsMessage{DNS: Dns{Flags: DnsFlags{TC: true, AA: true, RA: true, AD: true}}},
expected: "TC AA RA AD",
},
{
format: "repeated",
dm: DnsMessage{DNS: Dns{Repeated: 42}},
expected: "42",
},
{
format: "df tr",
dm: DnsMessage{NetworkInfo: DnsNetInfo{IpDefragmented: true, TcpReassembled: true}},
expected: "DF TR",
},
{
format: "queryip queryport",
dm: DnsMessage{NetworkInfo: DnsNetInfo{QueryIp: "1.2.3.4", QueryPort: "4200"}},
expected: "1.2.3.4 4200",
},
{
format: "responseip responseport",
dm: DnsMessage{NetworkInfo: DnsNetInfo{ResponseIp: "1.2.3.4", ResponsePort: "4200"}},
expected: "1.2.3.4 4200",
},
}

if line != "- - - - - - - - 0b \"dns \\\"collector\" - -" {
t.Errorf("text dns message invalid; %s", line)
for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) {
line := tc.dm.String(strings.Fields(tc.format), config.Global.TextFormatDelimiter, config.Global.TextFormatBoundary)
if line != tc.expected {
t.Errorf("Want: %s, got: %s", tc.expected, line)
}
})
}
}
11 changes: 6 additions & 5 deletions transformers/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ func (mp *MapQueries) Delete(key uint64) {
type HashQueries struct {
sync.RWMutex
ttl time.Duration
kv map[uint64]float64
kv map[uint64]int64
}

func NewHashQueries(ttl time.Duration) HashQueries {
return HashQueries{
ttl: ttl,
kv: make(map[uint64]float64),
kv: make(map[uint64]int64),
}
}

func (mp *HashQueries) Get(key uint64) (value float64, ok bool) {
func (mp *HashQueries) Get(key uint64) (value int64, ok bool) {
mp.RLock()
defer mp.RUnlock()
result, ok := mp.kv[key]
return result, ok
}

func (mp *HashQueries) Set(key uint64, value float64) {
func (mp *HashQueries) Set(key uint64, value int64) {
mp.Lock()
defer mp.Unlock()
mp.kv[key] = value
Expand Down Expand Up @@ -131,7 +131,8 @@ func (s *LatencyProcessor) MeasureLatency(dm *dnsutils.DnsMessage) {
value, ok := s.hashQueries.Get(key)
if ok {
s.hashQueries.Delete(key)
dm.DnsTap.Latency = dm.DnsTap.Timestamp - value
latency := float64(dm.DnsTap.Timestamp-value) / float64(1000000000)
dm.DnsTap.Latency = latency
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions transformers/latency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func Test_HashQueries(t *testing.T) {
mapttl := NewHashQueries(2 * time.Second)

// Set a new key/value
mapttl.Set(uint64(1), float64(0))
mapttl.Set(uint64(1), int64(0))

// Get value according to the key
_, ok := mapttl.Get(uint64(1))
Expand All @@ -26,7 +26,7 @@ func Test_HashQueries_Expire(t *testing.T) {
mapttl := NewHashQueries(1 * time.Second)

// Set a new key/value
mapttl.Set(uint64(1), float64(0))
mapttl.Set(uint64(1), int64(0))

// sleep during 2 seconds
time.Sleep(2 * time.Second)
Expand All @@ -43,15 +43,15 @@ func Benchmark_HashQueries_Set(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
mapexpire.Set(uint64(i), float64(i))
mapexpire.Set(uint64(i), int64(i))
}
}

func Benchmark_HashQueries_Delete(b *testing.B) {
mapexpire := NewHashQueries(60 * time.Second)

for i := 0; i < b.N; i++ {
mapexpire.Set(uint64(i), float64(i))
mapexpire.Set(uint64(i), int64(i))
}

b.ResetTimer()
Expand All @@ -65,7 +65,7 @@ func Benchmark_HashQueries_Get(b *testing.B) {
mapexpire := NewHashQueries(60 * time.Second)

for i := 0; i < b.N; i++ {
mapexpire.Set(uint64(i), float64(i))
mapexpire.Set(uint64(i), int64(i))
}

b.ResetTimer()
Expand All @@ -82,7 +82,7 @@ func Benchmark_HashQueries_Get(b *testing.B) {
func Benchmark_HashQueries_ConcurrentGet(b *testing.B) {
mapexpire := NewHashQueries(60 * time.Second)
for i := 0; i < b.N; i++ {
mapexpire.Set(uint64(i), float64(i))
mapexpire.Set(uint64(i), int64(i))
}

var wg sync.WaitGroup
Expand Down

0 comments on commit e57857f

Please sign in to comment.