Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions admin/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/fabricekabongo/loggerhead/clustering"
"github.com/fabricekabongo/loggerhead/config"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -35,11 +36,43 @@ func init() {
}

type OpsServer struct {
cluster *clustering.Cluster
cluster Cluster
cfg config.Config
}

func NewOpsServer(cluster *clustering.Cluster, cfg config.Config) *OpsServer {
type Cluster interface {
MemberList() MemberListProvider
Broadcasts() BroadcastQueue
}

type MemberListProvider interface {
LocalNode() *memberlist.Node
NumMembers() int
Members() []*memberlist.Node
GetHealthScore() int
}

type BroadcastQueue interface {
NumQueued() int
}

type clusterAdapter struct {
cluster *clustering.Cluster
}

func NewClusterAdapter(cluster *clustering.Cluster) Cluster {
return clusterAdapter{cluster: cluster}
}

func (c clusterAdapter) MemberList() MemberListProvider {
return c.cluster.MemberList()
}

func (c clusterAdapter) Broadcasts() BroadcastQueue {
return c.cluster.Broadcasts()
}

func NewOpsServer(cluster Cluster, cfg config.Config) *OpsServer {
return &OpsServer{
cluster: cluster,
cfg: cfg,
Expand Down
153 changes: 153 additions & 0 deletions admin/ops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package admin

import (
"encoding/json"
"net"
"net/http"
"net/http/httptest"
"testing"

"github.com/fabricekabongo/loggerhead/config"
"github.com/hashicorp/memberlist"
)

type mockMemberList struct {
local *memberlist.Node
members []*memberlist.Node
health int
}

func (m *mockMemberList) LocalNode() *memberlist.Node { return m.local }
func (m *mockMemberList) NumMembers() int { return len(m.members) }
func (m *mockMemberList) Members() []*memberlist.Node { return m.members }
func (m *mockMemberList) GetHealthScore() int { return m.health }

type mockQueue struct{ queued int }

func (m mockQueue) NumQueued() int { return m.queued }

type mockCluster struct {
ml MemberListProvider
queue BroadcastQueue
}

func (m mockCluster) MemberList() MemberListProvider { return m.ml }
func (m mockCluster) Broadcasts() BroadcastQueue { return m.queue }

func TestAdminData(t *testing.T) {
localNode := &memberlist.Node{
Name: "node-a",
Addr: net.ParseIP("127.0.0.1"),
State: memberlist.StateAlive,
}

cluster := mockCluster{
ml: &mockMemberList{
local: localNode,
members: []*memberlist.Node{localNode},
health: 2,
},
queue: mockQueue{queued: 3},
}

server := NewOpsServer(cluster, configForTests())

rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/admin-data", nil)
server.AdminData().ServeHTTP(rr, req)

if rr.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", rr.Code)
}

if got := rr.Header().Get("Content-Type"); got != "application/json" {
t.Fatalf("expected application/json content type, got %s", got)
}

var data Data
if err := json.NewDecoder(rr.Body).Decode(&data); err != nil {
t.Fatalf("failed to decode response: %v", err)
}

if data.Name != "node-a" {
t.Fatalf("expected node name to be propagated, got %s", data.Name)
}

if data.Address != "127.0.0.1" {
t.Fatalf("expected address to match node address, got %s", data.Address)
}

if data.NodesAlive != 1 {
t.Fatalf("expected one node alive, got %d", data.NodesAlive)
}

if data.Health != 2 {
t.Fatalf("expected health score from cluster, got %d", data.Health)
}

if data.QueueCount != 3 {
t.Fatalf("expected queue count to be included, got %d", data.QueueCount)
}
}

func TestAdminDataProxySkip(t *testing.T) {
localNode := &memberlist.Node{
Name: "node-a",
Addr: net.ParseIP("127.0.0.1"),
State: memberlist.StateAlive,
}
remoteNode := &memberlist.Node{
Name: "node-b",
Addr: net.ParseIP("127.0.0.2"),
State: memberlist.StateAlive,
}

cluster := mockCluster{
ml: &mockMemberList{
local: localNode,
members: []*memberlist.Node{localNode, remoteNode},
health: 1,
},
queue: mockQueue{queued: 0},
}

server := NewOpsServer(cluster, configForTests())

rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/admin-data?proxy=true", nil)
server.AdminData().ServeHTTP(rr, req)

var data Data
if err := json.NewDecoder(rr.Body).Decode(&data); err != nil {
t.Fatalf("failed to decode response: %v", err)
}

if len(data.Others) != 0 {
t.Fatalf("expected proxy mode to skip other members, got %d entries", len(data.Others))
}
}

func TestAdminUI(t *testing.T) {
cluster := mockCluster{
ml: &mockMemberList{local: &memberlist.Node{}, members: []*memberlist.Node{{}}, health: 0},
queue: mockQueue{queued: 0},
}

server := NewOpsServer(cluster, configForTests())

rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/", nil)
server.AdminUI().ServeHTTP(rr, req)

if rr.Code != http.StatusOK {
t.Fatalf("expected status 200 from admin UI, got %d", rr.Code)
}

if rr.Body.Len() == 0 {
t.Fatal("expected template to render some content")
}
}

func configForTests() config.Config {
return config.Config{}
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
ClusterCtx, concel := context.WithCancel(ctx)
clusterEngine := clustering.NewEngineDecorator(ClusterCtx, cluster, writeEngine)

opsServer := admin.NewOpsServer(cluster, cfg)
opsServer := admin.NewOpsServer(admin.NewClusterAdapter(cluster), cfg)
go opsServer.Start()

writer := server.NewListener(cfg.WritePort, cfg.MaxConnections, cfg.MaxEOFWait, clusterEngine) // This is the writer listener (for writes and broadcasts)
Expand Down