Skip to content

Commit 473b6e0

Browse files
committed
Adding total attempt metric for SpeculativeRetries
1 parent 0089073 commit 473b6e0

File tree

3 files changed

+65
-17
lines changed

3 files changed

+65
-17
lines changed

conn_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,15 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType {
491491
return Retry
492492
}
493493

494+
// speculativeTestObserver is a simple observer for testing speculativeExecutions execution metrics
495+
type speculativeTestObserver struct {
496+
executions int
497+
}
498+
499+
func (o *speculativeTestObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
500+
o.executions = q.SpeculativeExecutions
501+
}
502+
494503
func TestSpeculativeExecution(t *testing.T) {
495504
log := newTestLogger(LogLevelDebug)
496505
defer func() {
@@ -523,8 +532,11 @@ func TestSpeculativeExecution(t *testing.T) {
523532
// test Speculative policy with 1 additional execution
524533
sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}
525534

535+
// Add an observer to capture speculativeExecutions execution metrics
536+
observer := &speculativeTestObserver{}
537+
526538
// Build the query
527-
qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true)
539+
qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true).Observer(observer)
528540

529541
// Execute the query and close, check that it doesn't error out
530542
if err := qry.Exec(); err != nil {
@@ -549,6 +561,12 @@ func TestSpeculativeExecution(t *testing.T) {
549561
if requests1+requests2+requests3 > 6 {
550562
t.Errorf("error: expected to see 6 attempts, got %v\n", requests1+requests2+requests3)
551563
}
564+
565+
// Verify that the observer captured speculativeExecutions execution attempts
566+
// With NumAttempts: 1, we expect 1 speculativeExecutions attempt (in addition to the main execution)
567+
if observer.executions != 1 {
568+
t.Errorf("expected observer to capture 1 speculativeExecutions attempt, got %d", observer.executions)
569+
}
552570
}
553571

554572
// This tests that the policy connection pool handles SSL correctly

query_executor.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type internalRequest interface {
6161
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
6262
retryPolicy() RetryPolicy
6363
speculativeExecutionPolicy() SpeculativeExecutionPolicy
64+
speculativeExecutionStarted() // Used to update speculative execution count
6465
getQueryMetrics() *queryMetrics
6566
getRoutingInfo() *queryRoutingInfo
6667
getKeyspaceFunc() func() string
@@ -91,6 +92,8 @@ func (q *queryExecutor) speculate(ctx context.Context, qry internalRequest, sp S
9192
for i := 0; i < sp.Attempts(); i++ {
9293
select {
9394
case <-ticker.C:
95+
// Increment speculative count in metrics so it's available to the observer
96+
qry.speculativeExecutionStarted()
9497
go q.run(ctx, qry, hostIter, results)
9598
case <-ctx.Done():
9699
return newErrIter(ctx.Err(), qry.getQueryMetrics(), qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
@@ -383,17 +386,18 @@ func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Ite
383386
if q.qryOpts.observer != nil {
384387
metricsForHost := q.hostMetricsManager.attempt(latency, host)
385388
q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{
386-
Keyspace: keyspace,
387-
Statement: q.qryOpts.stmt,
388-
Values: q.qryOpts.values,
389-
Start: start,
390-
End: end,
391-
Rows: iter.numRows,
392-
Host: host,
393-
Metrics: metricsForHost,
394-
Err: iter.err,
395-
Attempt: attempt,
396-
Query: q.originalQuery,
389+
Keyspace: keyspace,
390+
Statement: q.qryOpts.stmt,
391+
Values: q.qryOpts.values,
392+
Start: start,
393+
End: end,
394+
Rows: iter.numRows,
395+
Host: host,
396+
Metrics: metricsForHost,
397+
Err: iter.err,
398+
Attempt: attempt,
399+
Query: q.originalQuery,
400+
SpeculativeExecutions: q.metrics.speculativeExecutions(),
397401
})
398402
}
399403
}
@@ -410,6 +414,10 @@ func (q *internalQuery) speculativeExecutionPolicy() SpeculativeExecutionPolicy
410414
return q.qryOpts.spec
411415
}
412416

417+
func (q *internalQuery) speculativeExecutionStarted() {
418+
q.metrics.speculativeExecution()
419+
}
420+
413421
func (q *internalQuery) GetRoutingKey() ([]byte, error) {
414422
if q.qryOpts.routingKey != nil {
415423
return q.qryOpts.routingKey, nil
@@ -612,11 +620,12 @@ func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Ite
612620
Start: start,
613621
End: end,
614622
// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
615-
Host: host,
616-
Metrics: metricsForHost,
617-
Err: iter.err,
618-
Attempt: attempt,
619-
Batch: b.originalBatch,
623+
Host: host,
624+
Metrics: metricsForHost,
625+
Err: iter.err,
626+
Attempt: attempt,
627+
Batch: b.originalBatch,
628+
SpeculativeExecutions: b.metrics.speculativeExecutions(),
620629
})
621630
}
622631

@@ -628,6 +637,10 @@ func (b *internalBatch) speculativeExecutionPolicy() SpeculativeExecutionPolicy
628637
return b.batchOpts.spec
629638
}
630639

640+
func (b *internalBatch) speculativeExecutionStarted() {
641+
b.metrics.speculativeExecution()
642+
}
643+
631644
func (b *internalBatch) GetRoutingKey() ([]byte, error) {
632645
if b.batchOpts.routingKey != nil {
633646
return b.batchOpts.routingKey, nil

session.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,8 @@ type hostMetrics struct {
909909
type queryMetrics struct {
910910
totalAttempts int64
911911
totalLatency int64
912+
// totalSpeculativeExecutions is the number of speculative executions launched for this query.
913+
totalSpeculativeExecutions int64
912914
}
913915

914916
func (qm *queryMetrics) attempt(addLatency time.Duration) int {
@@ -920,6 +922,15 @@ func (qm *queryMetrics) attempts() int {
920922
return int(atomic.LoadInt64(&qm.totalAttempts))
921923
}
922924

925+
func (qm *queryMetrics) speculativeExecutions() int {
926+
return int(atomic.LoadInt64(&qm.totalSpeculativeExecutions))
927+
}
928+
929+
// increments the speculative execution count.
930+
func (qm *queryMetrics) speculativeExecution() {
931+
atomic.AddInt64(&qm.totalSpeculativeExecutions, 1)
932+
}
933+
923934
func (qm *queryMetrics) latency() int64 {
924935
attempts := atomic.LoadInt64(&qm.totalAttempts)
925936
if attempts == 0 {
@@ -2324,6 +2335,9 @@ type ObservedQuery struct {
23242335
// The first attempt is number zero and any retries have non-zero attempt number.
23252336
Attempt int
23262337

2338+
// SpeculativeExecutions is the number of speculative executions launched
2339+
SpeculativeExecutions int
2340+
23272341
// Query object associated with this request. Should be used as read only.
23282342
Query *Query
23292343
}
@@ -2364,6 +2378,9 @@ type ObservedBatch struct {
23642378
// The first attempt is number zero and any retries have non-zero attempt number.
23652379
Attempt int
23662380

2381+
// SpeculativeExecutions is the number of speculative executions launched
2382+
SpeculativeExecutions int
2383+
23672384
// Batch object associated with this request. Should be used as read only.
23682385
Batch *Batch
23692386
}

0 commit comments

Comments
 (0)