Skip to content

Commit

Permalink
eBPF XDP collector (dmachard#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard authored Dec 16, 2022
1 parent b2a235e commit 7743234
Show file tree
Hide file tree
Showing 24 changed files with 1,033 additions and 138 deletions.
17 changes: 17 additions & 0 deletions .vscode/c_cpp_properties.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceFolder}/**",
"${workspaceFolder}/xdp/headers"
],
"defines": [],
"compilerPath": "/usr/bin/clang",
"cStandard": "c17",
"cppStandard": "c++14",
"intelliSenseMode": "linux-clang-x64"
}
],
"version": 4
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ DNS-collector also contains DNS parser with [`EDNS`](doc/dnsparser.md) support.
- [`Proxifier`](doc/collectors.md#dns-tap-proxifier) for DNSTap streams
- *Live capture on a network interface*
- [`AF_PACKET`](doc/collectors.md#live-capture-with-af_packet) socket with BPF filter
- [`eBPF XDP`](doc/collectors.md#live-capture-with-ebpf-xdp) ingress traffic
- *Read text or binary files as input*
- Read and tail on [`Plain text`](doc/collectors.md#tail) files
- Ingest [`PCAP`](doc/collectors.md#file-ingestor) or [`DNSTap`](doc/collectors.md#file-ingestor) files by watching a directory
Expand Down
2 changes: 1 addition & 1 deletion collectors/dns_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (d *DnsProcessor) Run(sendTo []chan dnsutils.DnsMessage) {
}

// compute latency if possible
if d.config.Collectors.LiveCapture.CacheSupport {
if d.cacheSupport {
queryport, _ := strconv.Atoi(dm.NetworkInfo.QueryPort)
if len(dm.NetworkInfo.QueryIp) > 0 && queryport > 0 && !dm.DNS.MalformedPacket {
// compute the hash of the query
Expand Down
78 changes: 0 additions & 78 deletions collectors/dnssniffer_windows.go

This file was deleted.

92 changes: 72 additions & 20 deletions collectors/dnssniffer.go → collectors/sniffer_afpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ func Htons(v uint16) int {
return int((v << 8) | (v >> 8))
}

func GetBpfFilter_Ingress(port int) []bpf.Instruction {
// bpf filter: (ip or ip6 ) and (udp or tcp) and port 53
// fragmented packets are ignored
var filter = []bpf.Instruction{
// Load eth.type (2 bytes at offset 12) and push-it in register A
bpf.LoadAbsolute{Off: 12, Size: 2},
// if eth.type == IPv4 continue with the next instruction
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x0800, SkipTrue: 0, SkipFalse: 8},
// Load ip.proto (1 byte at offset 23) and push-it in register A
bpf.LoadAbsolute{Off: 23, Size: 1},
// ip.proto == UDP ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x11, SkipTrue: 1, SkipFalse: 0},
// ip.proto == TCP ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x6, SkipTrue: 0, SkipFalse: 12},
// load flags and fragment offset (2 bytes at offset 20) to ignore fragmented packet
bpf.LoadAbsolute{Off: 20, Size: 2},
// Only look at the last 13 bits of the data saved in regiter A
// 0x1fff == 0001 1111 1111 1111 (fragment offset)
// If any of the data in fragment offset is true, ignore the packet
bpf.JumpIf{Cond: bpf.JumpBitsSet, Val: 0x1fff, SkipTrue: 10, SkipFalse: 0},
// Load ip.length
// Register X = ip header len * 4
bpf.LoadMemShift{Off: 14},
// Load source port in tcp or udp (2 bytes at offset x+14)
bpf.LoadIndirect{Off: 14, Size: 2},
// source port equal to 53 ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: uint32(port), SkipTrue: 6, SkipFalse: 7},
// if eth.type == IPv6 continue with the next instruction
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x86dd, SkipTrue: 0, SkipFalse: 6},
// Load ipv6.nxt (2 bytes at offset 12) and push-it in register A
bpf.LoadAbsolute{Off: 20, Size: 1},
// ip.proto == UDP ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x11, SkipTrue: 1, SkipFalse: 0},
// ip.proto == TCP ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: 0x6, SkipTrue: 0, SkipFalse: 3},
// Load source port tcp or udp (2 bytes at offset 54)
bpf.LoadAbsolute{Off: 54, Size: 2},
// source port equal to 53 ?
bpf.JumpIf{Cond: bpf.JumpEqual, Val: uint32(port), SkipTrue: 0, SkipFalse: 1},
// Keep the packet and send up to 65k of the packet to userspace
bpf.RetConstant{Val: 0xFFFF},
// Ignore packet
bpf.RetConstant{Val: 0},
}
return filter
}

func GetBpfFilter(port int) []bpf.Instruction {
// bpf filter: (ip or ip6 ) and (udp or tcp) and port 53
// fragmented packets are ignored
Expand Down Expand Up @@ -97,7 +144,7 @@ func RemoveBpfFilter(fd int) (err error) {
return syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_DETACH_FILTER, 0)
}

type DnsSniffer struct {
type AfpacketSniffer struct {
done chan bool
exit chan bool
fd int
Expand All @@ -110,9 +157,9 @@ type DnsSniffer struct {
name string
}

func NewDnsSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnsSniffer {
logger.Info("[%s] sniffer collector - enabled", name)
s := &DnsSniffer{
func NewAfpacketSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *AfpacketSniffer {
logger.Info("[%s] AFPACKET collector - enabled", name)
s := &AfpacketSniffer{
done: make(chan bool),
exit: make(chan bool),
config: config,
Expand All @@ -124,39 +171,39 @@ func NewDnsSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *l
return s
}

func (c *DnsSniffer) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] sniffer collector - "+msg, v...)
func (c *AfpacketSniffer) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] AFPACKET collector - "+msg, v...)
}

func (c *DnsSniffer) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] sniffer collector - "+msg, v...)
func (c *AfpacketSniffer) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] AFPACKET collector - "+msg, v...)
}

func (c *DnsSniffer) GetName() string { return c.name }
func (c *AfpacketSniffer) GetName() string { return c.name }

func (c *DnsSniffer) SetLoggers(loggers []dnsutils.Worker) {
func (c *AfpacketSniffer) SetLoggers(loggers []dnsutils.Worker) {
c.loggers = loggers
}

func (c *DnsSniffer) Loggers() []chan dnsutils.DnsMessage {
func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage {
channels := []chan dnsutils.DnsMessage{}
for _, p := range c.loggers {
channels = append(channels, p.Channel())
}
return channels
}

func (c *DnsSniffer) ReadConfig() {
c.port = c.config.Collectors.LiveCapture.Port
func (c *AfpacketSniffer) ReadConfig() {
c.port = c.config.Collectors.AfpacketLiveCapture.Port
c.identity = c.config.GetServerIdentity()
c.device = c.config.Collectors.LiveCapture.Device
c.device = c.config.Collectors.AfpacketLiveCapture.Device
}

func (c *DnsSniffer) Channel() chan dnsutils.DnsMessage {
func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage {
return nil
}

func (c *DnsSniffer) Stop() {
func (c *AfpacketSniffer) Stop() {
c.LogInfo("stopping...")

// exit to close properly
Expand All @@ -167,7 +214,7 @@ func (c *DnsSniffer) Stop() {
close(c.done)
}

func (c *DnsSniffer) Listen() error {
func (c *AfpacketSniffer) Listen() error {
// raw socket
fd, err := syscall.Socket(syscall.AF_PACKET, syscall.SOCK_RAW, Htons(syscall.ETH_P_ALL))
if err != nil {
Expand All @@ -188,6 +235,8 @@ func (c *DnsSniffer) Listen() error {
if err := syscall.Bind(fd, &ll); err != nil {
return err
}

c.LogInfo("Binding with success to iface %q (index %d)", iface.Name, iface.Index)
}

// set nano timestamp
Expand All @@ -197,16 +246,19 @@ func (c *DnsSniffer) Listen() error {
}

filter := GetBpfFilter(c.port)
//filter := GetBpfFilter_Ingress(c.port)
err = ApplyBpfFilter(filter, fd)
if err != nil {
return err
}

c.LogInfo("BPF filter applied")

c.fd = fd
return nil
}

func (c *DnsSniffer) Run() {
func (c *AfpacketSniffer) Run() {
c.LogInfo("starting collector...")
defer RemoveBpfFilter(c.fd)
defer syscall.Close(c.fd)
Expand All @@ -219,8 +271,8 @@ func (c *DnsSniffer) Run() {
}

dnsProcessor := NewDnsProcessor(c.config, c.logger, c.name)
dnsProcessor.cacheSupport = c.config.Collectors.LiveCapture.CacheSupport
dnsProcessor.queryTimeout = c.config.Collectors.LiveCapture.QueryTimeout
dnsProcessor.cacheSupport = c.config.Collectors.AfpacketLiveCapture.CacheSupport
dnsProcessor.queryTimeout = c.config.Collectors.AfpacketLiveCapture.QueryTimeout
go dnsProcessor.Run(c.Loggers())

go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/dmachard/go-logger"
)

type DnsSniffer struct {
type AfpacketSniffer struct {
done chan bool
exit chan bool
loggers []dnsutils.Worker
Expand All @@ -18,9 +18,9 @@ type DnsSniffer struct {
}

// workaround for macos, not yet supported
func NewDnsSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnsSniffer {
logger.Info("[%s] collector dns sniffer - enabled", name)
s := &DnsSniffer{
func NewAfpacketSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *AfpacketSniffer {
logger.Info("[%s] AFPACKET sniffer - enabled", name)
s := &AfpacketSniffer{
done: make(chan bool),
exit: make(chan bool),
config: config,
Expand All @@ -32,36 +32,36 @@ func NewDnsSniffer(loggers []dnsutils.Worker, config *dnsutils.Config, logger *l
return s
}

func (c *DnsSniffer) GetName() string { return c.name }
func (c *AfpacketSniffer) GetName() string { return c.name }

func (c *DnsSniffer) SetLoggers(loggers []dnsutils.Worker) {
func (c *AfpacketSniffer) SetLoggers(loggers []dnsutils.Worker) {
c.loggers = loggers
}

func (c *DnsSniffer) LogInfo(msg string, v ...interface{}) {
func (c *AfpacketSniffer) LogInfo(msg string, v ...interface{}) {
c.logger.Info("["+c.name+"] collector dns sniffer - "+msg, v...)
}

func (c *DnsSniffer) LogError(msg string, v ...interface{}) {
func (c *AfpacketSniffer) LogError(msg string, v ...interface{}) {
c.logger.Error("["+c.name+"] collector dns sniffer - "+msg, v...)
}

func (c *DnsSniffer) Loggers() []chan dnsutils.DnsMessage {
func (c *AfpacketSniffer) Loggers() []chan dnsutils.DnsMessage {
channels := []chan dnsutils.DnsMessage{}
for _, p := range c.loggers {
channels = append(channels, p.Channel())
}
return channels
}

func (c *DnsSniffer) ReadConfig() {
func (c *AfpacketSniffer) ReadConfig() {
}

func (c *DnsSniffer) Channel() chan dnsutils.DnsMessage {
func (c *AfpacketSniffer) Channel() chan dnsutils.DnsMessage {
return nil
}

func (c *DnsSniffer) Stop() {
func (c *AfpacketSniffer) Stop() {
c.LogInfo("stopping...")

// exit to close properly
Expand All @@ -72,7 +72,7 @@ func (c *DnsSniffer) Stop() {
close(c.done)
}

func (c *DnsSniffer) Run() {
func (c *AfpacketSniffer) Run() {
c.LogInfo("run terminated")
c.done <- true
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/dmachard/go-logger"
)

func TestDnsSnifferRun(t *testing.T) {
func TestAfpacketSnifferRun(t *testing.T) {
g := loggers.NewFakeLogger()
c := NewDnsSniffer([]dnsutils.Worker{g}, dnsutils.GetFakeConfig(), logger.New(false), "test")
c := NewAfpacketSniffer([]dnsutils.Worker{g}, dnsutils.GetFakeConfig(), logger.New(false), "test")
if err := c.Listen(); err != nil {
log.Fatal("collector sniffer listening error: ", err)
}
Expand Down
Loading

0 comments on commit 7743234

Please sign in to comment.