Skip to content

Batch Dispatching

Kenton Vizdos edited this page Mar 12, 2025 · 5 revisions

Batch Dispatching with TypeQueue

The BatchedDispatcher in TypeQueue allows you to group messages into batches (by default, batches of 10-- the SQS limit) and dispatch them together. This can significantly improve throughput and reduce network overhead when you have bursts of messages to send.

Warning

The BatchedDispatcher isn’t designed to run as a continuously background process. It is ideal for scenarios where a specific event or user action triggers a burst of SQS messages. There is no time-based flushing, and messages are only automatically sent once the batch accumulates 10 items. Be warned!

(if you would like to contribute a ContinuousBatchedDispatcher, please open a PR! It is definitely welcome)

Good to Know

typequeue.BatchedDispatcher{} fulfills both TypeQueueDispatcher interface AND TypeQueueBatchDispatcher. If you are writing a function that uses Batches, register it as a TypeQueueBatchDispatcher so you have access to .Flush()! That being said, the Mock Dispatcher fulfills both, so you can still test batched code with the mock.

How It Works

  • Batching Logic: Messages dispatched via the BatchedDispatcher are accumulated into an internal batch. Once the batch reaches a predetermined size (10 messages), it is immediately sent as a single SQS batch call.

  • Manual Flush: There are no automatic flushes—if your final batch contains fewer than 10 messages, you must call the Flush() method to send the remaining messages before your application shuts down. This is critical to ensure no messages remain undelivered. It is always recommended to call .Flush() after you've sent your messages.

  • Concurrency Management: The dispatcher handles concurrent calls to .Dispatch() behind the scenes. However, if you dispatch messages concurrently (for example, within separate goroutines), you should coordinate using a WaitGroup or similar mechanism before calling Flush().

Example Code (sync)

Below is an example (inspired by our test cases) that demonstrates how to use the BatchedDispatcher, calling .Dispatch synchronously:

// TestMessage implements the SQSAble interface.
type TestMessage struct {
	typequeue.SQSAble
	Message string `json:"msg"`
}

func main() {
	// Create a new BatchedDispatcher.
	dispatcher := typequeue.NewBatchedDispatcher[*TestMessage](
		context.Background(),
		logrus.New(), // critical: if errors are found after sending a message, they will only be LOGGED.
		sqsClient,
		nil, // No custom target URL resolver
		100, // Buffer size.
		5,   // Max concurrent network calls. Keep this relatively low if running in Lambda-land.
	)

	totalMessages := 12 // Dispatch 12 messages to trigger a full batch, with 2 left over for a manual Flush.
	ctxWithTrace := context.WithValue(context.Background(), "trace-id", "batch-trace")

	for i := range totalMessages {
		msg := &TestMessage{Message: fmt.Sprintf("Message %d", i)}
		_, err := dispatcher.Dispatch(ctxWithTrace, msg, "my-queue")
		if err != nil {
			fmt.Println("Dispatch error:", err)
		}
	}

	// Call Flush() before shutdown to ensure all pending messages are delivered.
	dispatcher.Flush()
}

What happened Behind the Scenes

There is a bunch happening behind the scenes, and I highly recommend reading the code behind this function (it's got a lot of concurrency management built in thats great to learn from!), but this is the overview:

  • .Dispatch() adds to a batch queue internally
  • Once the batch is detected as being of length 10, messages are concurrently sent to SQS, and the batch queue is set back to 0.
  • The final 2 messages (10 auto flushed, 2 remaining) are sent to SQS via the .Flush() call.

Example Code (async / concurrent)

If you are calling .Dispatch() within a Go Routine, it is critical you track the progress with a wait group. Flushing usually takes care of a bunch of the safety nets, but .Dispatch() within a Go Routine will cause problems once .Flush() is called (which it needs to be).

func main() {
	// Create a new BatchedDispatcher.
	dispatcher := typequeue.NewBatchedDispatcher[*TestMessage](
		context.Background(),
		logrus.New(), // critical: if errors are found after sending a message, they will only be LOGGED.
		sqsClient,
		nil, // No custom target URL resolver
		100, // Buffer size.
		5,   // Max concurrent network calls. Keep this relatively low if running in Lambda-land.
	)

	var wg sync.WaitGroup
	totalMessages := 12 // Dispatch 12 messages to trigger a full batch, with 2 left over for a manual Flush.
	ctxWithTrace := context.WithValue(context.Background(), "trace-id", "batch-trace")

	for i := range totalMessages {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			msg := &TestMessage{Message: fmt.Sprintf("Message %d", i)}
			_, err := dispatcher.Dispatch(ctxWithTrace, msg, "my-queue")
			if err != nil {
				fmt.Println("Dispatch error:", err)
			}
		}(i)
	}

	// Wait for all dispatch calls to complete.
	wg.Wait()

	// Call Flush() before shutdown to ensure all pending messages are delivered.
	dispatcher.Flush()
}

Key Points to Remember

  • Wait Before Flushing When dispatching concurrently (within a Goroutine), ensure all goroutines have completed (e.g., using a WaitGroup) before calling Flush().

  • Manual Flush is Required The dispatcher does not automatically flush partial batches. Always call Flush() before application shutdown or at appropriate trigger points to send any remaining messages.

  • Use Cases BatchedDispatcher is ideal for triggered events where a surge of SQS messages is expected. It is not designed to continuously batch in the background.

Reference Test Cases

For more in-depth examples, refer to our test files:

TestBatchedDispatcherFlush: Verifies that flushing sends any remaining messages when the batch is incomplete.

TestBatchedDispatcherFullBatch: Demonstrates that dispatching exactly 10 messages immediately triggers a batch delivery.

TestBatchedDispatcherConcurrentDispatch: Stresses the dispatcher by dispatching a large number of messages concurrently and ensuring all are delivered.

These tests illustrate the concurrency management and flushing behavior behind the scenes.

Clone this wiki locally