Skip to content

Commit

Permalink
latency as transform (dmachard#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard authored Mar 17, 2023
1 parent 225010f commit 666721d
Show file tree
Hide file tree
Showing 33 changed files with 420 additions and 210 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ Additionally, DNS-collector also support

**Transformers**:

- [`Latency Computing`](doc/transformers.md#dns-latency)
- Compute latency between replies and queries
- Detect evicted queries

- [`Traffic filtering`](doc/transformers.md#dns-filtering)
- Downsampling
- Dropping per Qname, QueryIP or Rcode
Expand Down Expand Up @@ -109,6 +113,7 @@ You will find below some examples of configuration to manage your DNS logs.
- [x] [Filtering incoming traffic with downsample and whitelist of domains](example-config/use-case-9.yml)
- [x] [Transform all domains to lowercase](example-config/use-case-10.yml)
- [x] [Add geographical metadata with GeoIP](example-config/use-case-11.yml)
- [x] [Count the number of no responses with outgoing queries](example-config/use-case-18.yml)

- Capture DNS traffic from FRSTRM/dnstap files
- [x] [Save incoming DNStap streams to file (frstrm)](example-config/use-case-13.yml)
Expand Down
53 changes: 10 additions & 43 deletions collectors/dns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package collectors

import (
"fmt"
"hash/fnv"
"strconv"
"strings"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
Expand All @@ -20,13 +17,11 @@ func GetFakeDns() ([]byte, error) {
}

type DnsProcessor struct {
done chan bool
recvFrom chan dnsutils.DnsMessage
logger *logger.Logger
config *dnsutils.Config
cacheSupport bool
queryTimeout int
name string
done chan bool
recvFrom chan dnsutils.DnsMessage
logger *logger.Logger
config *dnsutils.Config
name string
}

func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string) DnsProcessor {
Expand All @@ -44,10 +39,7 @@ func NewDnsProcessor(config *dnsutils.Config, logger *logger.Logger, name string
return d
}

func (d *DnsProcessor) ReadConfig() {
d.cacheSupport = false
d.queryTimeout = 5
}
func (d *DnsProcessor) ReadConfig() {}

func (c *DnsProcessor) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] dns processor - "+msg, v...)
Expand Down Expand Up @@ -77,12 +69,8 @@ func (d *DnsProcessor) Stop() {

func (d *DnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {

// dns cache to compute latency between response and query
cache_ttl := dnsutils.NewDnsCache(time.Duration(d.queryTimeout) * time.Second)
d.LogInfo("dns cached enabled: %t", d.cacheSupport)

// prepare enabled transformers
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name)
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, sendTo)

// read incoming dns message
d.LogInfo("running... waiting incoming dns message")
Expand Down Expand Up @@ -127,35 +115,14 @@ func (d *DnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
}
}

// compute latency if possible
if d.cacheSupport {
queryport, _ := strconv.Atoi(dm.NetworkInfo.QueryPort)
if len(dm.NetworkInfo.QueryIp) > 0 && queryport > 0 && !dm.DNS.MalformedPacket {
// compute the hash of the query
hash_data := []string{dm.NetworkInfo.QueryIp, dm.NetworkInfo.QueryPort, strconv.Itoa(dm.DNS.Id)}

hashfnv := fnv.New64a()
hashfnv.Write([]byte(strings.Join(hash_data[:], "+")))

if dm.DNS.Type == dnsutils.DnsQuery {
cache_ttl.Set(hashfnv.Sum64(), dm.DnsTap.Timestamp)
} else {
value, ok := cache_ttl.Get(hashfnv.Sum64())
if ok {
dm.DnsTap.Latency = dm.DnsTap.Timestamp - value
}
}
}
}

// convert latency to human
dm.DnsTap.LatencySec = fmt.Sprintf("%.6f", dm.DnsTap.Latency)

// apply all enabled transformers
if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP {
continue
}

// convert latency to human
dm.DnsTap.LatencySec = fmt.Sprintf("%.6f", dm.DnsTap.Latency)

// dispatch dns message to all generators
for i := range sendTo {
sendTo[i] <- dm
Expand Down
34 changes: 4 additions & 30 deletions collectors/dnstap_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package collectors

import (
"fmt"
"hash/fnv"
"net"
"strconv"
"strings"
"time"

"github.com/dmachard/go-dnscollector/dnsutils"
Expand Down Expand Up @@ -100,12 +98,8 @@ func (d *DnstapProcessor) Stop() {
func (d *DnstapProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
dt := &dnstap.Dnstap{}

// dns cache to compute latency between response and query
cache_ttl := dnsutils.NewDnsCache(time.Duration(d.config.Collectors.Dnstap.QueryTimeout) * time.Second)
d.LogInfo("dns cached enabled: %t", d.config.Collectors.Dnstap.CacheSupport)

// prepare enabled transformers
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name)
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, sendTo)

// read incoming dns message
d.LogInfo("running... waiting incoming dns message")
Expand Down Expand Up @@ -195,34 +189,14 @@ func (d *DnstapProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
}
}

// compute latency if possible
if d.config.Collectors.Dnstap.CacheSupport {
if len(dm.NetworkInfo.QueryIp) > 0 && queryport > 0 && !dm.DNS.MalformedPacket {
// compute the hash of the query
hash_data := []string{dm.NetworkInfo.QueryIp, dm.NetworkInfo.QueryPort, strconv.Itoa(dm.DNS.Id)}

hashfnv := fnv.New64a()
hashfnv.Write([]byte(strings.Join(hash_data[:], "+")))

if dm.DNS.Type == dnsutils.DnsQuery {
cache_ttl.Set(hashfnv.Sum64(), dm.DnsTap.Timestamp)
} else {
value, ok := cache_ttl.Get(hashfnv.Sum64())
if ok {
dm.DnsTap.Latency = dm.DnsTap.Timestamp - value
}
}
}
}

// convert latency to human
dm.DnsTap.LatencySec = fmt.Sprintf("%.6f", dm.DnsTap.Latency)

// apply all enabled transformers
if subprocessors.ProcessMessage(&dm) == transformers.RETURN_DROP {
continue
}

// convert latency to human
dm.DnsTap.LatencySec = fmt.Sprintf("%.6f", dm.DnsTap.Latency)

// dispatch dns message to all generators
for i := range sendTo {
sendTo[i] <- dm
Expand Down
1 change: 0 additions & 1 deletion collectors/file_ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ func (c *FileIngestor) ProcessPcap(filePath string) {

// remove event timer for this file
c.RemoveEvent(filePath)

}

func (c *FileIngestor) ProcessDnstap(filePath string) error {
Expand Down
2 changes: 1 addition & 1 deletion collectors/file_tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *Tail) Run() {
}

// prepare enabled transformers
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name)
subprocessors := transformers.NewTransforms(&c.config.IngoingTransformers, c.logger, c.name, c.Loggers())

// init dns message
dm := dnsutils.DnsMessage{}
Expand Down
2 changes: 1 addition & 1 deletion collectors/powerdns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *PdnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
pbdm := &powerdns_protobuf.PBDNSMessage{}

// prepare enabled transformers
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name)
subprocessors := transformers.NewTransforms(&d.config.IngoingTransformers, d.logger, d.name, sendTo)

// read incoming dns message
d.LogInfo("running... waiting incoming dns message")
Expand Down
2 changes: 0 additions & 2 deletions collectors/sniffer_afpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ func (c *AfpacketSniffer) Run() {
}

dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name)
dnsProcessor.cacheSupport = c.config.Collectors.AfpacketLiveCapture.CacheSupport
dnsProcessor.queryTimeout = c.config.Collectors.AfpacketLiveCapture.QueryTimeout
go dnsProcessor.Run(c.Loggers())

dnsChan := make(chan netlib.DnsPacket)
Expand Down
1 change: 0 additions & 1 deletion collectors/sniffer_xdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (c *XdpSniffer) Run() {
c.LogInfo("starting collector...")

dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name)
dnsProcessor.cacheSupport = false
go dnsProcessor.Run(c.Loggers())

iface, err := net.InterfaceByName("wlp2s0")
Expand Down
2 changes: 0 additions & 2 deletions collectors/tzsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ func (c *TzspSniffer) Run() {
}

dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name)
dnsProcessor.cacheSupport = c.config.Collectors.Tzsp.CacheSupport
dnsProcessor.queryTimeout = c.config.Collectors.Tzsp.QueryTimeout

go dnsProcessor.Run(c.Loggers())

Expand Down
27 changes: 11 additions & 16 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ global:
# - ra: recursion available
# - ad: authenticated data
# - edns-csubnet: client subnet
# - df: flag when ip defragmentation occured
# - tr: flag when tcp reassembled occured
# - df: ip defragmentation flag
# - tr: tcp reassembled flag
text-format: "timestamp-rfc3339ns identity operation rcode queryip queryport family protocol length qname qtype latency"

# create your dns collector, please refer bellow to see the list
Expand Down Expand Up @@ -97,11 +97,6 @@ multiplexer:
# cert-file: ""
# # private key server file
# key-file: ""
# # The cache is used to compute latency between replies and queries
# # This cache can be enabled if your dns server doesn't add the latency information
# cache-support: false
# # Ttl in second, max time to keep the query record in memory cache
# query-timeout: 5
# # Sets the socket receive buffer in bytes SO_RCVBUF, set to zero to use the default system value
# sock-rcvbuf: 0

Expand All @@ -128,10 +123,6 @@ multiplexer:
# port: 53
# # if "" bind on all interfaces
# device: wlp2s0
# # The cache is used to compute latency between replies and queries
# cache-support: true
# # Ttl in second, max time to keep the query record in memory cache
# query-timeout: 5

# # live capture with XDP
# xdp-sniffer:
Expand Down Expand Up @@ -193,11 +184,6 @@ multiplexer:
# listen-ip: 0.0.0.0
# # listen on port
# listen-port: 10000
# # The cache is used to compute latency between replies and queries
# # This cache can be enabled if your dns server doesn't add the latency information
# cache-support: false
# # Ttl in second, max time to keep the query record in memory cache
# query-timeout: 5

################################################
# list of supported loggers
Expand Down Expand Up @@ -430,6 +416,15 @@ multiplexer:
# list of transforms to apply on collectors or loggers
################################################

# # Use this transformer to compute latency and detect timeout on queries
# latency:
# # Measure latency between replies and queries
# measure-latency: false
# # Detect queries timeout
# detect-evicted-queries: false
# # timeout in second for queries
# queries-timeout: 2

# # Use this option to protect user privacy
# user-privacy:
# # IP-Addresses are anonymities by zeroing the host-part of an address.
Expand Down
41 changes: 0 additions & 41 deletions dnsutils/cache.go

This file was deleted.

Loading

0 comments on commit 666721d

Please sign in to comment.