Skip to content

Async Dispatching

Kenton Vizdos edited this page Mar 31, 2025 · 1 revision

AsyncDispatcher (Background Dispatcher)

The AsyncDispatcher in TypeQueue provides a background-safe, non-blocking way to dispatch messages to AWS SQS. It is ideal for decoupling the producing of messages from the dispatching of them, allowing your app to continue running without waiting on the network or managing dispatch errors immediately.

🚀 Ideal Use Case

You run a Go service that generates messages from user actions or scheduled tasks and want to send those messages to SQS without blocking. You also want to capture failures centrally for logging, alerting, or retrying.

🧠 What It Is

AsyncDispatcher runs two goroutines:

  • A dispatch loop that reads from a channel and sends messages to SQS.
  • A failure loop that handles any messages that failed to dispatch (e.g. network errors).

This structure makes it easy to plug AsyncDispatcher into any service and have background delivery without blocking your main thread or needing custom retry logic inline.

✅ When to Use

  • When dispatching SQS messages in response to user or background events and want them sent asynchronously without blocking.
  • When you want a centralized retry/logging strategy for dispatch failures.
  • When you want to decouple message creation from delivery, for example:
    • In an HTTP handler where you don’t want to block on SQS latency / handle dispatch errors inline.
    • In a task loop that emits messages but should stay fast.
  • When you want buffered dispatching and are okay with bounded memory (configurable via buffer size).

💡 You can combine AsyncDispatcher with BatchedDispatcher. Just set AsyncDispatcher.Dispatcher = BatchedDispatcher. This lets you asynchronously buffer messages and still take advantage of batching behind the scenes.

❌ When Not to Use

  • You need guaranteed delivery before moving on (a Dispatcher directly).
  • You cannot afford to drop messages on channel overflow (either implement backpressure, or fail hard with blocking writes).

🔧 How It Works

  1. You call Setup(ctx, bufferLen) and receive a channel.
  2. You Start() the dispatcher (starts dispatch and error goroutines).
  3. You write to the returned channel whenever you want to dispatch a message.
  4. You call Stop() to wait for all goroutines to finish and drain.
dispatcher := &typequeue.AsyncDispatcher[*ExampleEvent]{
  Dispatcher:   yourDispatcher,
  TargetQueue:  "my-queue",
  ErrorHandler: func(f typequeue.FailedMessage[*ExampleEvent]) {
    log.Println("failed dispatch:", f.Error)
  },
}

msgChan := dispatcher.Setup(ctx, 100) // buffered
dispatcher.Start()

msgChan <- &ExampleEvent{
  SQSAble: typequeue.SQSAble{TraceID: aws.String(uuid.NewString())},
  Message: "background hello",
}

// Later…
cancelCtx()
dispatcher.Stop()

⚠️ Channel Full Handling

By default, if msgChan or failedMsgChan fills up, the AsyncDispatcher will block. You can make your writes non-blocking like so:

select {
case msgChan <- yourMsg:
  // ok
default:
  log.Println("dispatcher buffer full — consider increasing buffer or logging fallback")
}

This pattern avoids blocking your thread and lets you decide what to do when the dispatcher is "at capacity."

🛑 Shutdown Behavior

  • The dispatcher listens to the killContext you provide.
  • Once canceled, it:
    • Stops accepting new work.
    • Drains all messages from msgChan and failedMsgChan.
    • Calls WaitGroup.Done() only after all messages are processed.
  • You must call Stop() to finalize the dispatcher lifecycle before exit.

🔍 Observability Tips

To trace what’s happening:

  • Add a logger in ErrorHandler
  • Use metrics (e.g., count successful/failed dispatches, buffer full errors)

Gotchas

  • If you send to a closed channel after Stop(), you'll panic.
  • Always cancel() the context and call Stop() before exiting your app.
  • Messages are not persisted. If your process crashes mid-dispatch, messages in the buffer are lost.

Let us know if you build something cool with AsyncDispatcher or need help designing around it.

Clone this wiki locally