Skip to content

Commit

Permalink
fix large dnstap frame 65Kb (dmachard#270)
Browse files Browse the repository at this point in the history
* fix large dnstap frame
  • Loading branch information
dmachard authored Apr 15, 2023
1 parent a73c15b commit c550ac0
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 214 deletions.
5 changes: 4 additions & 1 deletion collectors/dnstap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Dnstap struct {
logger *logger.Logger
name string
connMode string
stopping bool
}

func NewDnstap(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *Dnstap {
Expand Down Expand Up @@ -123,7 +124,8 @@ func (c *Dnstap) HandleConn(conn net.Conn) {
}

// process incoming frame and send it to dnstap consumer channel
if err := fs.ProcessFrame(dnstapProcessor.GetChannel()); err != nil {
err := fs.ProcessFrame(dnstapProcessor.GetChannel())
if err != nil && !c.stopping {
c.LogError("transport error: %s", err)
}

Expand All @@ -139,6 +141,7 @@ func (c *Dnstap) Channel() chan dnsutils.DnsMessage {

func (c *Dnstap) Stop() {
c.LogInfo("stopping...")
c.stopping = true

// closing properly current connections if exists
for _, conn := range c.conns {
Expand Down
8 changes: 4 additions & 4 deletions collectors/dnstap_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"google.golang.org/protobuf/proto"
)

func TestDnstapProcessor(t *testing.T) {
func Test_DnstapProcessor(t *testing.T) {
logger := logger.New(true)
var o bytes.Buffer
logger.SetOutput(&o)
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestDnstapProcessor(t *testing.T) {
}
}

func TestDnstapProcessor_MalformedDnsHeader(t *testing.T) {
func Test_DnstapProcessor_MalformedDnsHeader(t *testing.T) {
logger := logger.New(true)
var o bytes.Buffer
logger.SetOutput(&o)
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestDnstapProcessor_MalformedDnsHeader(t *testing.T) {
}
}

func TestDnstapProcessor_MalformedDnsQuestion(t *testing.T) {
func Test_DnstapProcessor_MalformedDnsQuestion(t *testing.T) {
logger := logger.New(true)
var o bytes.Buffer
logger.SetOutput(&o)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestDnstapProcessor_MalformedDnsQuestion(t *testing.T) {
}
}

func TestDnstapProcessor_MalformedDnsAnswer(t *testing.T) {
func Test_DnstapProcessor_MalformedDnsAnswer(t *testing.T) {
logger := logger.New(true)
var o bytes.Buffer
logger.SetOutput(&o)
Expand Down
5 changes: 4 additions & 1 deletion collectors/dnstap_proxifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type DnstapProxifier struct {
config *dnsutils.Config
logger *logger.Logger
name string
stopping bool
}

func NewDnstapProxifier(loggers []dnsutils.Worker, config *dnsutils.Config, logger *logger.Logger, name string) *DnstapProxifier {
Expand Down Expand Up @@ -108,7 +109,8 @@ func (c *DnstapProxifier) HandleConn(conn net.Conn) {
}

// process incoming frame and send it to recv channel
if err := fs.ProcessFrame(recvChan); err != nil {
err := fs.ProcessFrame(recvChan)
if err != nil && !c.stopping {
c.LogError("transport error: %s", err)
}

Expand All @@ -123,6 +125,7 @@ func (c *DnstapProxifier) Channel() chan dnsutils.DnsMessage {

func (c *DnstapProxifier) Stop() {
c.LogInfo("stopping...")
c.stopping = true

// closing properly current connections if exists
for _, conn := range c.conns {
Expand Down
298 changes: 196 additions & 102 deletions collectors/dnstap_proxifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,110 +14,204 @@ import (
"google.golang.org/protobuf/proto"
)

func TestDnstapProxifier_TcpSocket(t *testing.T) {
g := loggers.NewFakeLogger()

config := dnsutils.GetFakeConfig()
config.Collectors.DnstapProxifier.ListenPort = 6100

c := NewDnstapProxifier([]dnsutils.Worker{g}, config, logger.New(false), "test")
if err := c.Listen(); err != nil {
log.Fatal("collector dnstap relay tcp listening error: ", err)
}
go c.Run()

conn, err := net.Dial(dnsutils.SOCKET_TCP, ":6100")
if err != nil {
t.Error("could not connect to TCP server: ", err)
}
defer conn.Close()

r := bufio.NewReader(conn)
w := bufio.NewWriter(conn)
fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true)
if err := fs.InitSender(); err != nil {
t.Fatalf("framestream init error: %s", err)
} else {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
if err != nil {
t.Fatalf("dnstap proto marshal error %s", err)
}

// send query
frame.Write(data)
if err := fs.SendFrame(frame); err != nil {
t.Fatalf("send frame error %s", err)
}
func Test_DnstapProxifier(t *testing.T) {
testcases := []struct {
name string
mode string
address string
listen_port int
}{
{
name: "tcp_default",
mode: dnsutils.SOCKET_TCP,
address: ":6000",
listen_port: 0,
},
{
name: "tcp_custom_port",
mode: dnsutils.SOCKET_TCP,
address: ":7100",
listen_port: 7100,
},
{
name: "unix_default",
mode: dnsutils.SOCKET_UNIX,
address: "/tmp/dnscollector_relay.sock",
listen_port: 0,
},
}

// waiting message in channel
msg := <-g.Channel()
if len(msg.DnsTap.Payload) == 0 {
t.Errorf("DNStap payload is empty")
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
g := loggers.NewFakeLogger()

config := dnsutils.GetFakeConfig()
if tc.listen_port > 0 {
config.Collectors.DnstapProxifier.ListenPort = tc.listen_port
}
if tc.mode == dnsutils.SOCKET_UNIX {
config.Collectors.DnstapProxifier.SockPath = tc.address
}

c := NewDnstapProxifier([]dnsutils.Worker{g}, config, logger.New(false), "test")
if err := c.Listen(); err != nil {
log.Fatal("collector dnstap relay error: ", err)
}

go c.Run()

conn, err := net.Dial(tc.mode, tc.address)
if err != nil {
t.Error("could not connect: ", err)
}
defer conn.Close()

r := bufio.NewReader(conn)
w := bufio.NewWriter(conn)
fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true)
if err := fs.InitSender(); err != nil {
t.Fatalf("framestream init error: %s", err)
} else {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
if err != nil {
t.Fatalf("dnstap proto marshal error %s", err)
}

// send query
frame.Write(data)
if err := fs.SendFrame(frame); err != nil {
t.Fatalf("send frame error %s", err)
}
}

// waiting message in channel
msg := <-g.Channel()
if len(msg.DnsTap.Payload) == 0 {
t.Errorf("DNStap payload is empty")
}

c.Stop()
})
}
}

func TestDnstapProxifier_UnixSocket(t *testing.T) {
g := loggers.NewFakeLogger()
config := dnsutils.GetFakeConfig()
config.Collectors.DnstapProxifier.SockPath = "/tmp/dnscollector_relay.sock"
c := NewDnstapProxifier([]dnsutils.Worker{g}, config, logger.New(false), "test")
if err := c.Listen(); err != nil {
log.Fatal("collector dnstap replay unix listening error: ", err)
}
go c.Run()

conn, err := net.Dial(dnsutils.SOCKET_UNIX, config.Collectors.DnstapProxifier.SockPath)
if err != nil {
t.Error("could not connect to unix socket: ", err)
}
defer conn.Close()

r := bufio.NewReader(conn)
w := bufio.NewWriter(conn)
fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true)
if err := fs.InitSender(); err != nil {
t.Fatalf("framestream init error: %s", err)
} else {
frame := &framestream.Frame{}

// get fake dns question
dnsquery, err := GetFakeDns()
if err != nil {
t.Fatalf("dns question pack error")
}

// get fake dnstap message
dt_query := GetFakeDnstap(dnsquery)

// serialize to bytes
data, err := proto.Marshal(dt_query)
if err != nil {
t.Fatalf("dnstap proto marshal error %s", err)
}

// send query
frame.Write(data)
if err := fs.SendFrame(frame); err != nil {
t.Fatalf("send frame error %s", err)
}
}

// waiting message in channel
msg := <-g.Channel()
if len(msg.DnsTap.Payload) == 0 {
t.Errorf("DNStap payload is empty")
}
}
// func TestDnstapProxifier_TcpSocket(t *testing.T) {
// g := loggers.NewFakeLogger()

// config := dnsutils.GetFakeConfig()
// config.Collectors.DnstapProxifier.ListenPort = 6100
// config.Collectors.DnstapProxifier.SockPath = "/tmp/dnscollector_relay.sock"

// c := NewDnstapProxifier([]dnsutils.Worker{g}, config, logger.New(false), "test")
// if err := c.Listen(); err != nil {
// log.Fatal("collector dnstap relay tcp listening error: ", err)
// }
// go c.Run()

// conn, err := net.Dial(dnsutils.SOCKET_TCP, ":6100")
// if err != nil {
// t.Error("could not connect to TCP server: ", err)
// }
// defer conn.Close()

// r := bufio.NewReader(conn)
// w := bufio.NewWriter(conn)
// fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true)
// if err := fs.InitSender(); err != nil {
// t.Fatalf("framestream init error: %s", err)
// } else {
// frame := &framestream.Frame{}

// // get fake dns question
// dnsquery, err := GetFakeDns()
// if err != nil {
// t.Fatalf("dns question pack error")
// }

// // get fake dnstap message
// dt_query := GetFakeDnstap(dnsquery)

// // serialize to bytes
// data, err := proto.Marshal(dt_query)
// if err != nil {
// t.Fatalf("dnstap proto marshal error %s", err)
// }

// // send query
// frame.Write(data)
// if err := fs.SendFrame(frame); err != nil {
// t.Fatalf("send frame error %s", err)
// }
// }

// // waiting message in channel
// msg := <-g.Channel()
// if len(msg.DnsTap.Payload) == 0 {
// t.Errorf("DNStap payload is empty")
// }
// }

// func TestDnstapProxifier_UnixSocket(t *testing.T) {
// g := loggers.NewFakeLogger()
// config := dnsutils.GetFakeConfig()
// config.Collectors.DnstapProxifier.SockPath = "/tmp/dnscollector_relay.sock"
// c := NewDnstapProxifier([]dnsutils.Worker{g}, config, logger.New(false), "test")
// if err := c.Listen(); err != nil {
// log.Fatal("collector dnstap replay unix listening error: ", err)
// }
// go c.Run()

// conn, err := net.Dial(dnsutils.SOCKET_UNIX, config.Collectors.DnstapProxifier.SockPath)
// if err != nil {
// t.Error("could not connect to unix socket: ", err)
// }
// defer conn.Close()

// r := bufio.NewReader(conn)
// w := bufio.NewWriter(conn)
// fs := framestream.NewFstrm(r, w, conn, 5*time.Second, []byte("protobuf:dnstap.Dnstap"), true)
// if err := fs.InitSender(); err != nil {
// t.Fatalf("framestream init error: %s", err)
// } else {
// frame := &framestream.Frame{}

// // get fake dns question
// dnsquery, err := GetFakeDns()
// if err != nil {
// t.Fatalf("dns question pack error")
// }

// // get fake dnstap message
// dt_query := GetFakeDnstap(dnsquery)

// // serialize to bytes
// data, err := proto.Marshal(dt_query)
// if err != nil {
// t.Fatalf("dnstap proto marshal error %s", err)
// }

// // send query
// frame.Write(data)
// if err := fs.SendFrame(frame); err != nil {
// t.Fatalf("send frame error %s", err)
// }
// }

// // waiting message in channel
// msg := <-g.Channel()
// if len(msg.DnsTap.Payload) == 0 {
// t.Errorf("DNStap payload is empty")
// }
// }
Loading

0 comments on commit c550ac0

Please sign in to comment.