Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto
processorFactory.New,
logger,
prometheus.WrapRegistererWithPrefix("loki_dataobj_consumer_", reg),
partitionInstanceLifecycler,
)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
NewKafkaConsumerFactory(i, registerer, cfg.KafkaIngestion.KafkaConfig.MaxConsumerWorkers),
logger,
registerer,
i.partitionRingLifecycler,
partition.WithHeaderToContextExtractor(validation.IngestionPoliciesKafkaHeadersToContext),
)
if err != nil {
Expand Down
39 changes: 32 additions & 7 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -43,6 +44,9 @@ type Reader interface {
// SetPhase sets the phase for the reader. This is used to differentiate between different phases of the reader.
// For example, we can use this to differentiate between the startup phase and the running phase.
SetPhase(phase string)
// SetPartitionState sets the partition ring state for the reader. This is used to track the partition's
// state in the ring (pending, active, inactive) for metrics labeling purposes.
SetPartitionState(state string)
}

// ReaderMetrics contains metrics specific to Kafka reading operations
Expand All @@ -65,7 +69,7 @@ func NewReaderMetrics(r prometheus.Registerer) *ReaderMetrics {
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
Buckets: prometheus.ExponentialBuckets(0.125, 2, 18),
}, []string{"phase"}),
}, []string{"phase", "partition_state"}),
fetchWaitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: client.MetricsPrefix,
Name: "partition_reader_fetch_wait_duration_seconds",
Expand Down Expand Up @@ -99,6 +103,8 @@ type KafkaReader struct {
consumerGroup string
metrics *ReaderMetrics
phase string
partitionStateMu sync.RWMutex
partitionState string
logger log.Logger
headerToContextExtractor func(context.Context, []kgo.RecordHeader) context.Context
}
Expand Down Expand Up @@ -128,11 +134,12 @@ func NewKafkaReader(
}

reader := &KafkaReader{
client: c,
topic: cfg.Topic,
partitionID: partitionID,
metrics: metrics,
logger: logger,
client: c,
topic: cfg.Topic,
partitionID: partitionID,
metrics: metrics,
logger: logger,
partitionState: "unknown",
}

// Apply functional options
Expand All @@ -159,22 +166,40 @@ func (r *KafkaReader) SetPhase(phase string) {
r.phase = phase
}

// SetPartitionState sets the partition ring state for the reader.
func (r *KafkaReader) SetPartitionState(state string) {
r.partitionStateMu.Lock()
defer r.partitionStateMu.Unlock()
r.partitionState = state
}

// Poll retrieves the next batch of records from Kafka
// Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
func (r *KafkaReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) {
start := time.Now()
fetches := r.client.PollRecords(ctx, maxPollRecords)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())

// Capture current partition state once for consistent metric labeling
r.partitionStateMu.RLock()
partitionState := r.partitionState
r.partitionStateMu.RUnlock()

// Record metrics
r.metrics.fetchesTotal.Add(float64(len(fetches)))
var numRecords int
fetches.EachRecord(func(record *kgo.Record) {
numRecords++
r.metrics.consumptionLag.WithLabelValues(r.phase).Observe(time.Since(record.Timestamp).Seconds())
r.metrics.consumptionLag.WithLabelValues(r.phase, partitionState).Observe(time.Since(record.Timestamp).Seconds())
})
r.metrics.recordsPerFetch.Observe(float64(numRecords))

// If no records were fetched, observe lag as 0 (caught up)
// This ensures inactive partitions continue reporting metrics
if numRecords == 0 {
r.metrics.consumptionLag.WithLabelValues(r.phase, partitionState).Observe(0)
}

// Handle errors
var errs multierror.MultiError
fetches.EachError(func(topic string, partition int32, err error) {
Expand Down
73 changes: 70 additions & 3 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -22,6 +23,11 @@ const (
phaseRunning = "running"
)

// StateProvider provides access to partition ring state.
type StateProvider interface {
GetPartitionState(ctx context.Context) (ring.PartitionState, time.Time, error)
}

type ConsumerFactory func(committer Committer, logger log.Logger) (Consumer, error)

type Consumer interface {
Expand Down Expand Up @@ -59,6 +65,7 @@ type ReaderService struct {
metrics *serviceMetrics
committer *partitionCommitter
partitionID int32
stateProvider StateProvider

lastProcessedOffset int64
}
Expand All @@ -77,6 +84,7 @@ func NewReaderService(
consumerFactory ConsumerFactory,
logger log.Logger,
reg prometheus.Registerer,
stateProvider StateProvider,
readerOpts ...ReaderOption,
) (*ReaderService, error) {
readerMetrics := NewReaderMetrics(reg)
Expand Down Expand Up @@ -106,6 +114,7 @@ func NewReaderService(
consumerFactory,
logger,
reg,
stateProvider,
), nil
}

Expand All @@ -117,6 +126,7 @@ func newReaderService(
consumerFactory ConsumerFactory,
logger log.Logger,
reg prometheus.Registerer,
stateProvider StateProvider,
) *ReaderService {
s := &ReaderService{
cfg: cfg,
Expand All @@ -127,6 +137,7 @@ func newReaderService(
logger: log.With(logger, "partition", partitionID, "consumer_group", offsetManager.ConsumerGroup()),
metrics: newServiceMetrics(reg),
lastProcessedOffset: int64(KafkaEndOffset),
stateProvider: stateProvider,
}

// Create the committer
Expand Down Expand Up @@ -175,14 +186,21 @@ func (s *ReaderService) running(ctx context.Context) error {
s.metrics.reportRunning()
s.reader.SetPhase(phaseRunning)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Set initial partition state synchronously before starting polling
if s.stateProvider != nil {
s.updatePartitionState(ctx)
// Start monitoring partition state changes in background
go s.monitorPartitionState(ctx)
}

consumer, err := s.consumerFactory(s.committer, log.With(s.logger, "phase", phaseRunning))
if err != nil {
return fmt.Errorf("creating consumer: %w", err)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

recordsChan := s.startFetchLoop(ctx)
wait := consumer.Start(ctx, recordsChan)
wait()
Expand Down Expand Up @@ -373,3 +391,52 @@ func loggerWithCurrentLagIfSet(logger log.Logger, currentLag time.Duration) log.

return log.With(logger, "current_lag", currentLag)
}

func (s *ReaderService) monitorPartitionState(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.updatePartitionState(ctx)
}
}
}

func (s *ReaderService) updatePartitionState(ctx context.Context) {
state, _, err := s.stateProvider.GetPartitionState(ctx)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to get partition state", "err", err)
s.reader.SetPartitionState("unknown")
return
}

var stateStr string
switch state {
case ring.PartitionActive:
stateStr = "active"
case ring.PartitionInactive:
stateStr = "inactive"
case ring.PartitionPending:
stateStr = "pending"
default:
stateStr = "unknown"
}

// Get current state to detect changes
currentReader, ok := s.reader.(*KafkaReader)
if ok {
currentReader.partitionStateMu.RLock()
oldState := currentReader.partitionState
currentReader.partitionStateMu.RUnlock()

if oldState != stateStr {
level.Info(s.logger).Log("msg", "partition state changed", "old_state", oldState, "new_state", stateStr)
}
}

s.reader.SetPartitionState(stateStr)
}
1 change: 1 addition & 0 deletions pkg/kafka/partition/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func readersFromKafkaCfg(
consumerFactory,
log.NewNopLogger(),
nil,
nil, // stateProvider is optional in tests
)
require.NoError(t, err)

Expand Down
Loading