Skip to content

Commit d2b987e

Browse files
committed
Better failure reporting from queue
1 parent 5a022d5 commit d2b987e

File tree

9 files changed

+60
-200
lines changed

9 files changed

+60
-200
lines changed

examples/cmd/markdownRenderer/store/queue.go

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -224,35 +224,6 @@ func (conn *store) QueueGroupComplete(id int64) (done bool, cancelled bool, err
224224
return count == 0, cancelled, err
225225
}
226226

227-
func (conn *store) IsQueueAddressComplete(address string) (done bool, err error) {
228-
address = strings.TrimSpace(address)
229-
if address == "" {
230-
return false, errors.New("no address provided for IsQueueAddressComplete")
231-
}
232-
233-
// Include the address in the transaction description to avoid races when
234-
// testing with a special locking connection.
235-
var count int64
236-
var workErr error
237-
conn.db.Transaction(func(tx *gorm.DB) error {
238-
err = tx.Model(&Queue{}).Where("address = ?", address).Count(&count).Error
239-
if err != nil {
240-
return err
241-
}
242-
243-
c2 := &store{
244-
db: tx,
245-
}
246-
workErr = c2.QueueAddressedCheck(address)
247-
return nil
248-
})
249-
250-
if workErr != nil {
251-
err = workErr
252-
}
253-
return count == 0, err
254-
}
255-
256227
func (conn *store) IsQueueAddressInProgress(address string) (bool, error) {
257228
address = strings.TrimSpace(address)
258229
if address == "" {
@@ -568,67 +539,6 @@ func (conn *store) QueueDelete(permitId permit.Permit) (err error) {
568539
})
569540
}
570541

571-
func (conn *store) QueueAddressedComplete(address string, failure error) (err error) {
572-
573-
var bytes []byte
574-
if failure != nil {
575-
queueError, isQueueError := failure.(*queue.QueueError)
576-
if isQueueError {
577-
// Record type of queue.QueueError
578-
bytes, err = json.Marshal(queueError)
579-
} else {
580-
// Record generic error
581-
bytes, err = json.Marshal(&queue.QueueError{
582-
Message: failure.Error(),
583-
})
584-
}
585-
}
586-
// Return if there were any marshaling errors
587-
if err != nil {
588-
return err
589-
}
590-
591-
var postgres bool
592-
593-
err = conn.db.Transaction(func(tx *gorm.DB) error {
594-
if postgres {
595-
// Lock to prevent race. Don't allow reading until this operation is complete.
596-
err = tx.Exec("LOCK queue_failure IN ACCESS EXCLUSIVE MODE").Error
597-
if err != nil {
598-
return err
599-
}
600-
}
601-
602-
// First, delete any references to this address
603-
err = tx.Delete(&QueueFailure{}, "address = ?", address).Error
604-
if err != nil {
605-
return err
606-
}
607-
608-
// Next, if there is an error to report, insert it
609-
if failure != nil {
610-
failure := QueueFailure{
611-
Address: address,
612-
Error: string(bytes),
613-
}
614-
err = tx.Create(&failure).Error
615-
if err != nil {
616-
return err
617-
}
618-
}
619-
return nil
620-
})
621-
return
622-
}
623-
624-
type QueueAddressFailure struct {
625-
Message string `json:"error"`
626-
}
627-
628-
func (err *QueueAddressFailure) Error() string {
629-
return err.Message
630-
}
631-
632542
func (conn *store) QueueAddressedCheck(address string) error {
633543
var failure QueueFailure
634544
err := conn.db.First(&failure, "address = ?", address).Error

examples/cmd/markdownRenderer/store/queue_test.go

Lines changed: 22 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package store
55
import (
66
"database/sql"
77
"encoding/json"
8-
"errors"
98
"io/ioutil"
109
"os"
1110
"testing"
@@ -182,12 +181,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
182181
c.Assert(err, check.DeepEquals, queue.ErrDuplicateAddressedPush)
183182

184183
// Addresses should not be completed
185-
done, err := s.store.IsQueueAddressComplete("abc")
184+
queued, err := s.store.IsQueueAddressInProgress("abc")
186185
c.Assert(err, check.IsNil)
187-
c.Check(done, check.Equals, false)
188-
done, err = s.store.IsQueueAddressComplete("def")
186+
c.Check(queued, check.Equals, true)
187+
queued, err = s.store.IsQueueAddressInProgress("def")
189188
c.Assert(err, check.IsNil)
190-
c.Check(done, check.Equals, false)
189+
c.Check(queued, check.Equals, true)
191190

192191
// Attempt to pop a type that doesn't exist in the queue
193192
queueWork, err := s.store.QueuePop("test", 0, []uint64{888})
@@ -206,12 +205,12 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
206205
c.Assert(err, check.IsNil)
207206

208207
// First address should be completed
209-
done, err = s.store.IsQueueAddressComplete("abc")
208+
queued, err = s.store.IsQueueAddressInProgress("abc")
210209
c.Assert(err, check.IsNil)
211-
c.Check(done, check.Equals, true)
212-
done, err = s.store.IsQueueAddressComplete("def")
210+
c.Check(queued, check.Equals, false)
211+
queued, err = s.store.IsQueueAddressInProgress("def")
213212
c.Assert(err, check.IsNil)
214-
c.Check(done, check.Equals, false)
213+
c.Check(queued, check.Equals, true)
215214

216215
// Pop second item
217216
queueWork, err = s.store.QueuePop("test", 0, []uint64{TypeTest, TypeTest2})
@@ -225,22 +224,18 @@ func (s *QueueSqliteSuite) TestAddressedPush(c *check.C) {
225224
c.Assert(err, check.IsNil)
226225

227226
// Second address should be completed
228-
done, err = s.store.IsQueueAddressComplete("def")
227+
queued, err = s.store.IsQueueAddressInProgress("def")
229228
c.Assert(err, check.IsNil)
230-
c.Check(done, check.Equals, true)
229+
c.Check(queued, check.Equals, false)
231230

232-
// Record error for second address
233-
err = s.store.QueueAddressedComplete("def", errors.New("some error"))
231+
// Second address should be completed
232+
queued, err = s.store.IsQueueAddressInProgress("def")
234233
c.Assert(err, check.IsNil)
235-
236-
// Second address should be completed, but with error
237-
done, err = s.store.IsQueueAddressComplete("def")
238-
c.Assert(err, check.ErrorMatches, "some error")
239-
c.Check(done, check.Equals, true)
234+
c.Check(queued, check.Equals, false)
240235

241236
// Cannot check for empty address
242-
_, err = s.store.IsQueueAddressComplete(" ")
243-
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressComplete")
237+
_, err = s.store.IsQueueAddressInProgress(" ")
238+
c.Check(err, check.ErrorMatches, "no address provided for IsQueueAddressInProgress")
244239
}
245240

246241
func (s *QueueSqliteSuite) TestQueueGroups(c *check.C) {
@@ -593,43 +588,6 @@ func (s *QueueSqliteSuite) TestQueueGroupCancel(c *check.C) {
593588
c.Check(cancelled, check.Equals, true)
594589
}
595590

596-
func (s *QueueSqliteSuite) TestAddressFailure(c *check.C) {
597-
// Fail
598-
address := "abcdefg"
599-
// Pass a generic error
600-
err := s.store.QueueAddressedComplete(address, errors.New("first error"))
601-
c.Assert(err, check.IsNil)
602-
err = s.store.(*store).QueueAddressedCheck(address)
603-
c.Check(err, check.ErrorMatches, "first error")
604-
605-
// Should be castable to queue.QueueError pointer
606-
queueError, ok := err.(*queue.QueueError)
607-
c.Assert(ok, check.Equals, true)
608-
c.Check(queueError, check.DeepEquals, &queue.QueueError{
609-
Code: 0, // Not set on generic error
610-
Message: "first error",
611-
})
612-
613-
// Fail again, but pass a typed error
614-
err = s.store.QueueAddressedComplete(address, &queue.QueueError{Code: 404, Message: "second error"})
615-
c.Assert(err, check.IsNil)
616-
err = s.store.(*store).QueueAddressedCheck(address)
617-
c.Check(err, check.ErrorMatches, "second error")
618-
619-
// Should be castable to queue.QueueError pointer
620-
queueError, ok = err.(*queue.QueueError)
621-
c.Assert(ok, check.Equals, true)
622-
c.Check(queueError, check.DeepEquals, &queue.QueueError{
623-
Code: 404,
624-
Message: "second error",
625-
})
626-
627-
// Don't fail
628-
err = s.store.QueueAddressedComplete(address, nil)
629-
c.Assert(err, check.IsNil)
630-
c.Check(s.store.(*store).QueueAddressedCheck(address), check.IsNil)
631-
}
632-
633591
func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
634592
// Is a non-existing address in the queue?
635593
found, err := s.store.IsQueueAddressInProgress("def")
@@ -645,12 +603,17 @@ func (s *QueueSqliteSuite) TestIsQueueAddressInProgress(c *check.C) {
645603
c.Assert(err, check.IsNil)
646604
c.Assert(found, check.Equals, true)
647605

606+
// Assign a permit
607+
w, err := s.store.QueuePop("test", 0, []uint64{TypeTest})
608+
c.Assert(err, check.IsNil)
609+
c.Assert(w.Permit, check.Not(check.Equals), permit.Permit(0))
610+
648611
// Complete the work
649-
err = s.store.QueueAddressedComplete("def", nil)
612+
err = s.store.QueueDelete(w.Permit)
650613
c.Assert(err, check.IsNil)
651614
found, err = s.store.IsQueueAddressInProgress("def")
652615

653616
// Now it should not be found
654617
c.Assert(err, check.IsNil)
655-
c.Assert(found, check.Equals, true)
618+
c.Assert(found, check.Equals, false)
656619
}

pkg/rsqueue/agent/agent.go

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -349,8 +349,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
349349
}
350350

351351
// Run the job (blocks)
352-
func() {
353-
352+
jobError := func() (result error) {
354353
// Extend the job's heartbeat in the queue periodically until the job completes.
355354
done := make(chan struct{})
356355
defer close(done)
@@ -362,7 +361,7 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
362361
case <-ticker.C:
363362
// Extend the job visibility
364363
a.traceLogger.Debugf("Job of type %d visibility timeout needs to be extended: %d\n", queueWork.WorkType, queueWork.Permit)
365-
err := a.queue.Extend(queueWork.Permit)
364+
err = a.queue.Extend(queueWork.Permit)
366365
if err != nil {
367366
a.logger.Debugf("Error extending job for work type %d: %s", queueWork.WorkType, err)
368367
}
@@ -374,24 +373,17 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
374373

375374
// Actually run the job
376375
a.traceLogger.Debugf("Running job with type %d, address '%s', and permit '%d'", queueWork.WorkType, queueWork.Address, queueWork.Permit)
377-
err := a.runner.Run(queue.RecursableWork{
376+
err = a.runner.Run(queue.RecursableWork{
378377
Work: queueWork.Work,
379378
WorkType: queueWork.WorkType,
380379
Context: ctx,
381380
})
382381
if err != nil {
382+
result = err
383383
a.logger.Debugf("Job type %d: address: %s work: %#v returned error: %s\n", queueWork.WorkType, queueWork.Address, string(queueWork.Work), err)
384384
}
385385

386-
// If the work was addressed, record the result
387-
if queueWork.Address != "" {
388-
// Note, `RecordFailure` will record the error if err != nil. If
389-
// err == nil, then it clears any recorded error for the address.
390-
err = a.queue.RecordFailure(queueWork.Address, err)
391-
if err != nil {
392-
a.logger.Debugf("Failed while recording addressed work success/failure: %s\n", err)
393-
}
394-
}
386+
return
395387
}()
396388

397389
// Decrement the job count
@@ -416,16 +408,16 @@ func (a *DefaultAgent) runJob(ctx context.Context, queueWork *queue.QueueWork, j
416408
// Delete the job from the queue
417409
a.traceLogger.Debugf("Deleting job from queue: %d\n", queueWork.Permit)
418410

419-
if err := a.queue.Delete(queueWork.Permit); err != nil {
420-
a.logger.Debugf("queue Delete() returned error: %s", err)
411+
if deleteErr := a.queue.Delete(queueWork.Permit); deleteErr != nil {
412+
a.logger.Debugf("queue Delete() returned error: %s", deleteErr)
421413
}
422414

423415
// Notify that work is complete if work is addressed. This must happen after the work has been deleted from the
424416
// queue.
425417
if queueWork.Address != "" {
426418
n := time.Now()
427419
a.traceLogger.Debugf("Ready to notify of address %s", queueWork.Address)
428-
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete))
420+
notify(agenttypes.NewWorkCompleteNotification(queueWork.Address, a.notifyTypeWorkComplete, jobError))
429421
a.traceLogger.Debugf("Notified of address %s in %d ms", queueWork.Address, time.Now().Sub(n).Nanoseconds()/1000000)
430422
}
431423

pkg/rsqueue/agent/agent_test.go

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ type FakeQueue struct {
117117
extend error
118118
mutex sync.Mutex
119119
record error
120-
errs []error
121120
}
122121

123122
func (*FakeQueue) Push(priority uint64, groupId int64, work queue.Work) error {
@@ -138,12 +137,6 @@ func (*FakeQueue) IsAddressInQueue(address string) (bool, error) {
138137
func (f *FakeQueue) PollAddress(address string) (errs <-chan error) {
139138
return f.pollErrs
140139
}
141-
func (f *FakeQueue) RecordFailure(address string, failure error) error {
142-
if f.record == nil {
143-
f.errs = append(f.errs, failure)
144-
}
145-
return f.record
146-
}
147140
func (*FakeQueue) Get(maxPriority uint64, maxPriorityChan chan uint64, types queue.QueueSupportedTypes, stop chan bool) (*queue.QueueWork, error) {
148141
return nil, nil
149142
}
@@ -214,9 +207,9 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
214207
}()
215208

216209
// Send some messages across the queue while blocked and make sure they're discarded
217-
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10)
218-
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10)
219-
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10)
210+
msgs <- agenttypes.NewWorkCompleteNotification("somewhere1", 10, nil)
211+
msgs <- agenttypes.NewWorkCompleteNotification("somewhere2", 10, nil)
212+
msgs <- agenttypes.NewWorkCompleteNotification("somewhere3", 10, nil)
220213

221214
c.Check(completed, check.Equals, false)
222215
jobDone <- 98
@@ -229,7 +222,6 @@ func (s *AgentSuite) TestWaitBlocked(c *check.C) {
229222
func (s *AgentSuite) TestRunJobOk(c *check.C) {
230223
q := &FakeQueue{
231224
extended: make(map[permit.Permit]int),
232-
errs: make([]error, 0),
233225
}
234226
finish := map[string]chan bool{
235227
"one": make(chan bool),
@@ -316,11 +308,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
316308
c.Check(receivedPriority[1], check.Equals, MAX_CONCURRENCY)
317309
c.Check(receivedPriority[2], check.Equals, MAX_CONCURRENCY)
318310

319-
// Jobs b2 and b3 should have been marked as successful since they were addressed
320-
c.Check(q.errs, check.HasLen, 2)
321-
c.Check(q.errs[0], check.IsNil)
322-
c.Check(q.errs[1], check.IsNil)
323-
324311
// Jobs should have been deleted
325312
c.Check(q.deleted, check.Equals, 3)
326313

@@ -337,7 +324,6 @@ func (s *AgentSuite) TestRunJobOk(c *check.C) {
337324
func (s *AgentSuite) TestRunJobFails(c *check.C) {
338325
q := &FakeQueue{
339326
extended: make(map[permit.Permit]int),
340-
errs: make([]error, 0),
341327
}
342328
finish := map[string]chan bool{
343329
"one": make(chan bool),
@@ -363,7 +349,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
363349
b1, err := json.Marshal(&w1)
364350
c.Assert(err, check.IsNil)
365351

366-
n := func(n listener.Notification) {}
352+
var notified listener.Notification
353+
n := func(n listener.Notification) {
354+
notified = n
355+
}
367356

368357
maxPriorityDone := make(chan struct{})
369358
go func() {
@@ -387,8 +376,10 @@ func (s *AgentSuite) TestRunJobFails(c *check.C) {
387376
<-maxPriorityDone
388377

389378
// Jobs should have been marked as failed since it was addressed
390-
c.Check(q.errs, check.HasLen, 1)
391-
c.Check(q.errs[0], check.ErrorMatches, "work failed")
379+
c.Assert(notified, check.FitsTypeOf, &agenttypes.WorkCompleteNotification{})
380+
workNotified, ok := notified.(*agenttypes.WorkCompleteNotification)
381+
c.Assert(ok, check.Equals, true)
382+
c.Assert(workNotified.Error, check.ErrorMatches, "work failed")
392383

393384
// Jobs should have been deleted
394385
c.Check(q.deleted, check.Equals, 1)

0 commit comments

Comments
 (0)