5
5
"log/slog"
6
6
"math/rand" // nosemgrep
7
7
8
- "github.com/resonatehq/resonate/internal/kernel/metadata"
9
8
"github.com/resonatehq/resonate/internal/kernel/t_aio"
10
9
"github.com/resonatehq/resonate/internal/metrics"
11
10
@@ -14,20 +13,18 @@ import (
14
13
)
15
14
16
15
type aioDST struct {
17
- r * rand.Rand
18
- sqes []* bus.SQE [t_aio.Submission , t_aio.Completion ]
19
- cqes []* bus.CQE [t_aio.Submission , t_aio.Completion ]
20
- subsystems map [t_aio.Kind ]Subsystem
21
- metrics * metrics.Metrics
22
- failureProbability float64
16
+ r * rand.Rand
17
+ sqes []* bus.SQE [t_aio.Submission , t_aio.Completion ]
18
+ cqes []* bus.CQE [t_aio.Submission , t_aio.Completion ]
19
+ subsystems map [t_aio.Kind ]Subsystem
20
+ metrics * metrics.Metrics
23
21
}
24
22
25
- func NewDST (r * rand.Rand , metrics * metrics.Metrics , failureProbability float64 ) * aioDST {
23
+ func NewDST (r * rand.Rand , metrics * metrics.Metrics ) * aioDST {
26
24
return & aioDST {
27
- r : r ,
28
- subsystems : map [t_aio.Kind ]Subsystem {},
29
- metrics : metrics ,
30
- failureProbability : failureProbability ,
25
+ r : r ,
26
+ subsystems : map [t_aio.Kind ]Subsystem {},
27
+ metrics : metrics ,
31
28
}
32
29
}
33
30
@@ -70,6 +67,8 @@ func (a *aioDST) Dequeue(n int) []*bus.CQE[t_aio.Submission, t_aio.Completion] {
70
67
a .cqes = a .cqes [min (n , len (a .cqes )):]
71
68
72
69
for _ , cqe := range cqes {
70
+ slog .Debug ("aio:dequeue" , "cqe" , cqe )
71
+
73
72
var status string
74
73
if cqe .Error != nil {
75
74
status = "failure"
@@ -93,30 +92,7 @@ func (a *aioDST) Flush(t int64) {
93
92
94
93
for _ , sqes := range util .OrderedRangeKV (flush ) {
95
94
if subsystem , ok := a .subsystems [sqes .Key ]; ok {
96
- // Randomly decide whether to process SQE or return an error
97
- if a .r .Float64 () < a .failureProbability {
98
- // Create a new CQE
99
- errorCqe := createErrorCQE ("aio dst error occurred while processing SQE" , "aio dst failure" , simpleCallback )
100
- errorCqe .Completion = & t_aio.Completion {}
101
-
102
- slog .Debug ("aio dst: failure" , "err" , errorCqe )
103
- a .cqes = append (a .cqes , errorCqe )
104
- } else {
105
- // Do the I/O
106
- processedCQEs := subsystem .NewWorker (0 ).Process (sqes .Value )
107
- // Randomly decide whether to return an error after processing SQE
108
- if a .r .Float64 () < a .failureProbability {
109
- for _ , cqe := range processedCQEs {
110
- errorCqe := createErrorCQE ("aio dst error occurred after processing SQE" , "aio dst failure" , simpleCallback )
111
- errorCqe .Completion = & t_aio.Completion {}
112
-
113
- slog .Debug ("aio dst: failure" , "err" , errorCqe , "cqe" , cqe )
114
- a .cqes = append (a .cqes , cqe , errorCqe )
115
- }
116
- } else {
117
- a .cqes = append (a .cqes , processedCQEs ... )
118
- }
119
- }
95
+ a .cqes = append (a .cqes , subsystem .NewWorker (0 ).Process (sqes .Value )... )
120
96
} else {
121
97
panic ("invalid aio submission" )
122
98
}
@@ -125,19 +101,6 @@ func (a *aioDST) Flush(t int64) {
125
101
a .sqes = nil
126
102
}
127
103
128
- func simpleCallback (completion * t_aio.Completion , err error ) {
129
- }
130
-
131
- func createErrorCQE (errorMsg string , tags string , callback func (* t_aio.Completion , error )) * bus.CQE [t_aio.Submission , t_aio.Completion ] {
132
- errorCqe := & bus.CQE [t_aio.Submission , t_aio.Completion ]{
133
- Error : fmt .Errorf (errorMsg ),
134
- Metadata : metadata .New ("dst error" ),
135
- }
136
- errorCqe .Metadata .Tags .Set ("aio" , tags )
137
- errorCqe .Callback = callback
138
- return errorCqe
139
- }
140
-
141
104
func (a * aioDST ) String () string {
142
105
// use subsystem keys so that we can compare cross-store dst runs
143
106
subsystems := make ([]t_aio.Kind , len (a .subsystems ))
0 commit comments