Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add readiness endpoint #15

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
IMAGE_NAME ?= linuxptp-daemon-image
IMAGE_TAG_BASE ?= ghcr.io/k8snetworkplumbingwg/${IMAGE_NAME}
VERSION ?=latest
IMG ?= $(IMAGE_TAG_BASE):$(VERSION)
CONTAINER_TOOL ?=docker

.PHONY: test
default:
./hack/build.sh
image:
./hack/build-image.sh
push:
docker push ghcr.io/k8snetworkplumbingwg/linuxptp-daemon-image:latest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

${CONTAINER_TOOL} push ${IMG}
clean:
./hack/cleanup.sh
fmt:
Expand All @@ -14,4 +20,4 @@ test:
go test ./... --tags=unittests -coverprofile=cover.out

lint:
golangci-lint run
golangci-lint run
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func main() {
go lm.Run()

defer close(lm.Close)

tracker := &daemon.ReadyTracker{}

go daemon.New(
nodeName,
daemon.PtpNamespace,
Expand All @@ -130,6 +133,7 @@ func main() {
&refreshNodePtpDevice,
closeProcessManager,
cp.pmcPollInterval,
tracker,
).Run()

tickerPull := time.NewTicker(time.Second * time.Duration(cp.updateInterval))
Expand All @@ -143,6 +147,8 @@ func main() {
daemon.StartMetricsServer("0.0.0.0:9091")
}

daemon.StartReadyServer("0.0.0.0:8081", tracker)

for {
select {
case <-tickerPull.C:
Expand Down
8 changes: 7 additions & 1 deletion hack/build-image.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#!/bin/bash

docker build -t ghcr.io/k8snetworkplumbingwg/linuxptp-daemon-image -f ./Dockerfile .
CONTAINER_TOOL="${CONTAINER_TOOL:-docker}"
IMAGE_NAME="${IMAGE_NAME:-linuxptp-daemon-image}"
IMAGE_TAG_BASE="${IMAGE_TAG_BASE:-ghcr.io/k8snetworkplumbingwg/IMAGE_NAME}"
VERSION="${VERSION:-latest}"
IMG="${IMAGE_TAG_BASE}:${VERSION}"

$CONTAINER_TOOL build -t "${IMG}" -f ./Dockerfile .
77 changes: 49 additions & 28 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ func NewProcessManager() *ProcessManager {
}
}

// NewDaemonForTests is used by unit tests
func NewDaemonForTests(tracker *ReadyTracker, processManager *ProcessManager) *Daemon {
tracker.processManager = processManager
return &Daemon{
readyTracker: tracker,
processManager: processManager,
}
}

// SetTestProfileProcess ...
func (p *ProcessManager) SetTestProfileProcess(name string, ifaces config.IFaces, socketPath,
processConfigPath string, nodeProfile ptpv1.PtpProfile) {
Expand Down Expand Up @@ -151,27 +160,28 @@ func (p *ProcessManager) UpdateSynceConfig(config *synce.Relations) {
}

type ptpProcess struct {
name string
ifaces config.IFaces
processSocketPath string
processConfigPath string
configName string
messageTag string
eventCh chan event.EventChannel
exitCh chan bool
execMutex sync.Mutex
stopped bool
logFilterRegex string
cmd *exec.Cmd
depProcess []process // these are list of dependent process which needs to be started/stopped if the parent process is starts/stops
nodeProfile ptpv1.PtpProfile
parentClockClass float64
pmcCheck bool
clockType event.ClockType
ptpClockThreshold *ptpv1.PtpClockThreshold
haProfile map[string][]string // stores list of interface name for each profile
syncERelations *synce.Relations
c *net.Conn
name string
ifaces config.IFaces
processSocketPath string
processConfigPath string
configName string
messageTag string
eventCh chan event.EventChannel
exitCh chan bool
execMutex sync.Mutex
stopped bool
logFilterRegex string
cmd *exec.Cmd
depProcess []process // these are list of dependent process which needs to be started/stopped if the parent process is starts/stops
nodeProfile ptpv1.PtpProfile
parentClockClass float64
pmcCheck bool
clockType event.ClockType
ptpClockThreshold *ptpv1.PtpClockThreshold
haProfile map[string][]string // stores list of interface name for each profile
syncERelations *synce.Relations
c *net.Conn
hasCollectedMetrics bool
}

func (p *ptpProcess) Stopped() bool {
Expand Down Expand Up @@ -202,6 +212,7 @@ type Daemon struct {
ptpUpdate *LinuxPTPConfUpdate

processManager *ProcessManager
readyTracker *ReadyTracker

hwconfigs *[]ptpv1.HwConfig

Expand Down Expand Up @@ -230,13 +241,20 @@ func New(
refreshNodePtpDevice *bool,
closeManager chan bool,
pmcPollInterval int,
tracker *ReadyTracker,
) *Daemon {
if !stdoutToSocket {
RegisterMetrics(nodeName)
}
InitializeOffsetMaps()
pluginManager := registerPlugins(plugins)
eventChannel := make(chan event.EventChannel, 100)
pm := &ProcessManager{
process: nil,
eventChannel: eventChannel,
ptpEventHandler: event.Init(nodeName, stdoutToSocket, eventSocket, eventChannel, closeManager, Offset, ClockState, ClockClassMetrics),
}
tracker.processManager = pm
return &Daemon{
nodeName: nodeName,
namespace: namespace,
Expand All @@ -248,12 +266,9 @@ func New(
refreshNodePtpDevice: refreshNodePtpDevice,
pmcPollInterval: pmcPollInterval,
//TODO:Enable only for GM
processManager: &ProcessManager{
process: nil,
eventChannel: eventChannel,
ptpEventHandler: event.Init(nodeName, stdoutToSocket, eventSocket, eventChannel, closeManager, Offset, ClockState, ClockClassMetrics),
},
stopCh: stopCh,
processManager: pm,
readyTracker: tracker,
stopCh: stopCh,
}
}

Expand Down Expand Up @@ -305,9 +320,10 @@ func printWhenNotNil(p interface{}, description string) {
}
}

// SetProcessManager ...
// SetProcessManager in tests
func (dn *Daemon) SetProcessManager(p *ProcessManager) {
dn.processManager = p
dn.readyTracker.processManager = p
}

// Delete all socket and config files
Expand All @@ -329,6 +345,8 @@ func (dn *Daemon) cleanupTempFiles() error {
}

func (dn *Daemon) applyNodePTPProfiles() error {
dn.readyTracker.setConfig(false)

glog.Infof("in applyNodePTPProfiles")
for _, p := range dn.processManager.process {
if p != nil {
Expand All @@ -343,6 +361,7 @@ func (dn *Daemon) applyNodePTPProfiles() error {
}
}
p.depProcess = nil
p.hasCollectedMetrics = false
//cleanup metrics
deleteMetrics(p.ifaces, p.haProfile, p.name, p.configName)
if p.name == syncEProcessName && p.syncERelations != nil {
Expand Down Expand Up @@ -427,6 +446,7 @@ func (dn *Daemon) applyNodePTPProfiles() error {
}
dn.pluginManager.PopulateHwConfig(dn.hwconfigs)
*dn.refreshNodePtpDevice = true
dn.readyTracker.setConfig(true)
return nil
}

Expand Down Expand Up @@ -1009,6 +1029,7 @@ func (p *ptpProcess) processPTPMetrics(output string) {
p.ProcessSynceEvents(logEntry)
} else {
configName, source, ptpOffset, clockState, iface := extractMetrics(p.messageTag, p.name, p.ifaces, output)
p.hasCollectedMetrics = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you would know if there are metrics only if you have ptpOffset, I think That coould be good start. But if events are enable dthen extract metrics is moved to cloud-event-proxy and it will not enter into this condition .

if iface != "" { // for ptp4l/phc2sys this function only update metrics
var values map[event.ValueType]interface{}
ifaceName := masterOffsetIface.getByAlias(configName, iface).name
Expand Down
1 change: 1 addition & 0 deletions pkg/daemon/daemon_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func applyProfileSyncE(t *testing.T, profile *ptpv1.PtpProfile) {
nil,
make(chan bool),
30,
&ReadyTracker{},
)
assert.NotNil(t, dn)
err := dn.applyNodePtpProfile(0, profile)
Expand Down
3 changes: 1 addition & 2 deletions pkg/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,7 @@ func TestDaemon_ApplyHaProfiles(t *testing.T) {
processManager.SetTestProfileProcess(*p1.Name, ifaces1, "socket1", "config1", p1)
processManager.SetTestProfileProcess(*p2.Name, ifaces2, "socket2", "config1", p2)
processManager.SetTestProfileProcess(*p3.Name, nil, "", "config1", p3)
dd := &daemon.Daemon{}
dd.SetProcessManager(processManager)
dd := daemon.NewDaemonForTests(&daemon.ReadyTracker{}, processManager)
haProfiles, cmdLine := dd.ApplyHaProfiles(&p3, "")
assert.NotEmpty(t, cmdLine, "cmdLine is not empty")
assert.Equal(t, len(haProfiles), 2, "ha has two profiles")
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package daemon

import (
"fmt"
"github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/synce"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/k8snetworkplumbingwg/linuxptp-daemon/pkg/synce"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/collectors"

Expand Down Expand Up @@ -723,7 +724,6 @@ func addFlagsForMonitor(process string, configOpts *string, conf *ptp4lConf, std
func StartMetricsServer(bindAddress string) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

go utilwait.Until(func() {
err := http.ListenAndServe(bindAddress, mux)
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions pkg/daemon/ready.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package daemon

import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/golang/glog"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
)

type ReadyTracker struct {
mutex sync.Mutex
config bool
processManager *ProcessManager
}

func (rt *ReadyTracker) Ready() (bool, string) {
rt.mutex.Lock()
defer rt.mutex.Unlock()

if !rt.config {
return false, "Config not applied"
}

if len(rt.processManager.process) == 0 {
return false, "No processes have started"
}

notRunning := strings.Builder{}
noMetrics := strings.Builder{}
for _, p := range rt.processManager.process {
if p.Stopped() {
if notRunning.Len() > 0 {
notRunning.WriteString(", ")
}
notRunning.WriteString(p.name)
} else if !p.hasCollectedMetrics {
if noMetrics.Len() > 0 {
noMetrics.WriteString(", ")
}
noMetrics.WriteString(p.name)
}

}
if notRunning.Len() > 0 {
return false, "Stopped process(es): " + notRunning.String()
}

if noMetrics.Len() > 0 {
return false, "Process(es) have not yet collected metrics: " + noMetrics.String()
}

return true, ""
}

func (rt *ReadyTracker) setConfig(v bool) {
rt.mutex.Lock()
rt.config = v
rt.mutex.Unlock()
}

type readyHandler struct {
tracker *ReadyTracker
}

func (h readyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if isReady, msg := h.tracker.Ready(); !isReady {
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintf(w, "503: %s\n", msg)
} else {
w.WriteHeader(http.StatusOK)
}
}

func StartReadyServer(bindAddress string, tracker *ReadyTracker) {
glog.Info("Starting Ready Server")
mux := http.NewServeMux()
mux.Handle("/ready", readyHandler{tracker: tracker})
go utilwait.Until(func() {
err := http.ListenAndServe(bindAddress, mux)
if err != nil {
utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
}
}, 5*time.Second, utilwait.NeverStop)
}