Skip to content

Commit 73c33b9

Browse files
committed
Revert to disk based queue, with batching
1 parent 3a15318 commit 73c33b9

File tree

2 files changed

+95
-49
lines changed

2 files changed

+95
-49
lines changed

provider/internal/queue/queue.go

+93-29
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ import (
1818
var log = logging.Logger("provider.queue")
1919

2020
const (
21+
batchSize = 1024
22+
batchCommitInterval = 5 * time.Second
23+
2124
// Number of input CIDs to buffer without blocking.
22-
inputBufferSize = 65536
25+
inputBufferSize = 1024 * 256
2326
// Time for Close to wait to finish writing CIDs to datastore.
24-
shutdownTimeout = 5 * time.Second
27+
shutdownTimeout = 30 * time.Second
2528
)
2629

2730
// Queue provides a FIFO interface to the datastore for storing cids.
@@ -36,46 +39,49 @@ const (
3639
type Queue struct {
3740
// used to differentiate queues in datastore
3841
// e.g. provider vs reprovider
39-
ds datastore.Datastore // Must be threadsafe
42+
ds datastore.Batching
4043
dequeue chan cid.Cid
4144
enqueue chan cid.Cid
4245
inBuf *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input
4346
close context.CancelFunc
4447
closed chan struct{}
4548
closeOnce sync.Once
49+
50+
syncDone chan struct{}
51+
syncMutex sync.Mutex
4652
}
4753

4854
// NewQueue creates a queue for cids
49-
func NewQueue(ds datastore.Datastore) *Queue {
55+
func NewQueue(ds datastore.Batching) *Queue {
5056
dequeue := make(chan cid.Cid)
5157
enqueue := make(chan cid.Cid)
5258

59+
ctx, cancel := context.WithCancel(context.Background())
5360
q := &Queue{
54-
dequeue: dequeue,
55-
enqueue: enqueue,
61+
close: cancel,
62+
closed: make(chan struct{}),
63+
ds: namespace.Wrap(ds, datastore.NewKey("/queue")),
64+
dequeue: dequeue,
65+
enqueue: enqueue,
66+
syncDone: make(chan struct{}, 1),
5667
}
5768

58-
if ds == nil {
59-
q.inBuf = chanqueue.New(
60-
chanqueue.WithInput(enqueue),
61-
chanqueue.WithOutput(dequeue),
62-
chanqueue.WithCapacity[cid.Cid](inputBufferSize),
63-
)
64-
} else {
65-
ctx, cancel := context.WithCancel(context.Background())
66-
q.close = cancel
67-
q.inBuf = chanqueue.New(
68-
chanqueue.WithInput(enqueue),
69-
chanqueue.WithCapacity[cid.Cid](inputBufferSize),
70-
)
71-
q.ds = namespace.Wrap(ds, datastore.NewKey("/queue"))
72-
q.closed = make(chan struct{})
73-
go q.worker(ctx)
74-
}
69+
q.inBuf = chanqueue.New(
70+
chanqueue.WithInput(enqueue),
71+
chanqueue.WithCapacity[cid.Cid](inputBufferSize),
72+
)
73+
go q.worker(ctx)
7574

7675
return q
7776
}
7877

78+
func (q *Queue) Sync() {
79+
q.syncMutex.Lock()
80+
q.inBuf.In() <- cid.Undef
81+
<-q.syncDone
82+
q.syncMutex.Unlock()
83+
}
84+
7985
// Close stops the queue
8086
func (q *Queue) Close() error {
8187
var err error
@@ -117,6 +123,7 @@ func (q *Queue) Dequeue() <-chan cid.Cid {
117123
// worker run dequeues and enqueues when available.
118124
func (q *Queue) worker(ctx context.Context) {
119125
defer close(q.closed)
126+
defer q.inBuf.Shutdown()
120127

121128
var (
122129
c cid.Cid = cid.Undef
@@ -126,7 +133,27 @@ func (q *Queue) worker(ctx context.Context) {
126133
)
127134
readInBuf := q.inBuf.Out()
128135

136+
var batchCount int
137+
b, err := q.ds.Batch(ctx)
138+
if err != nil {
139+
log.Errorf("Failed to create batch, stopping provider: %s", err)
140+
return
141+
}
142+
143+
defer func() {
144+
if batchCount != 0 {
145+
if err := b.Commit(ctx); err != nil {
146+
log.Errorf("Failed to write cid batch: %s", err)
147+
}
148+
}
149+
close(q.syncDone)
150+
}()
151+
152+
batchTicker := time.NewTicker(batchCommitInterval)
153+
defer batchTicker.Stop()
154+
129155
for {
156+
//fmt.Println("---> inbuf len:", q.inBuf.Len(), "batch count:", batchCount)
130157
if c == cid.Undef {
131158
head, err := q.getQueueHead(ctx)
132159
if err != nil {
@@ -153,12 +180,20 @@ func (q *Queue) worker(ctx context.Context) {
153180
if c != cid.Undef {
154181
dequeue = q.dequeue
155182
}
183+
var commit, needSync bool
156184

157185
select {
158186
case toQueue, ok := <-readInBuf:
159187
if !ok {
160188
return
161189
}
190+
if toQueue == cid.Undef {
191+
if batchCount != 0 {
192+
commit = true
193+
}
194+
needSync = true
195+
break
196+
}
162197
// Add suffix to key path to allow multiple entries with same sequence.
163198
nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr))
164199
counter++
@@ -170,24 +205,53 @@ func (q *Queue) worker(ctx context.Context) {
170205
cstr = c.String()
171206
}
172207

173-
if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil {
174-
log.Errorf("Failed to enqueue cid: %s", err)
208+
//if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil {
209+
if err := b.Put(ctx, nextKey, toQueue.Bytes()); err != nil {
210+
log.Errorf("Failed to batch cid: %s", err)
175211
continue
176212
}
213+
batchCount++
214+
if batchCount == batchSize {
215+
commit = true
216+
}
217+
case <-batchTicker.C:
218+
if batchCount != 0 && q.inBuf.Len() == 0 {
219+
commit = true
220+
}
177221
case dequeue <- c:
222+
// Do not batch delete. Delete must be committed immediately, otherwise the same head cid will be read from the datastore.
178223
err := q.ds.Delete(ctx, k)
179224
if err != nil {
180225
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
181226
continue
182227
}
183228
c = cid.Undef
184229
case <-ctx.Done():
185-
if q.inBuf != nil {
186-
for range readInBuf {
187-
}
188-
}
189230
return
190231
}
232+
233+
if commit {
234+
if err = b.Commit(ctx); err != nil {
235+
log.Errorf("Failed to write cid batch, stopping provider: %s", err)
236+
return
237+
}
238+
b, err = q.ds.Batch(ctx)
239+
if err != nil {
240+
log.Errorf("Failed to create batch, stopping provider: %s", err)
241+
return
242+
}
243+
batchCount = 0
244+
commit = false
245+
}
246+
247+
if needSync {
248+
needSync = false
249+
select {
250+
case q.syncDone <- struct{}{}:
251+
case <-ctx.Done():
252+
return
253+
}
254+
}
191255
}
192256
}
193257

provider/internal/queue/queue_test.go

+2-20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313

1414
func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) {
1515
t.Helper()
16+
17+
q.Sync()
1618
for i, c := range cids {
1719
select {
1820
case dequeued, ok := <-q.dequeue:
@@ -50,26 +52,6 @@ func TestBasicOperation(t *testing.T) {
5052
}
5153
}
5254

53-
func TestBasicOperationNoDS(t *testing.T) {
54-
queue := NewQueue(nil)
55-
defer queue.Close()
56-
57-
cids := random.Cids(10)
58-
for _, c := range cids {
59-
queue.Enqueue(c)
60-
}
61-
62-
assertOrdered(cids, queue, t)
63-
64-
err := queue.Close()
65-
if err != nil {
66-
t.Fatal(err)
67-
}
68-
if err = queue.Close(); err != nil {
69-
t.Fatal(err)
70-
}
71-
}
72-
7355
func TestMangledData(t *testing.T) {
7456
ds := sync.MutexWrap(datastore.NewMapDatastore())
7557
queue := NewQueue(ds)

0 commit comments

Comments
 (0)