Releases: kubecost/events
v0.0.8
v0.0.7
Version 0.0.6
What's Changed
Synchronous Dispatch and Receiving
While the internals of the dispatching remain asynchronous, the API allows for event dispatchers to block until events are "received." This behavior depends on both the dispatcher and the event handler. If I add an event stream handler like in the previous example:
dispatcher := events.NewDispatcher[MyEvent]()
// typically, a stream will be handled in a separate goroutine
go func() {
stream := dispatcher.NewEventStream()
for event := range stream.Stream() {
fmt.Println(event.Message)
}
}()Then, this receiver will always receive and handle events asynchronously. If you were dispatch an event using the DispatchSync(...) method on Dispatcher, the dispatcher will block until the event is dispatched to the receiver. However, what you wanted the DispatchSync(...) to block until the event is received and handled? For this, you will need to change the range loop in your handler slightly:
go func() {
// This line is the same
stream := dispatcher.NewEventStream()
// We changed the variable name to `syncEvent` and we changed the method from `Stream()` to `SyncStream()`
for syncEvent := range stream.SyncStream() {
// As a convention, to _ensure_ that `syncEvent.Done()` is called when the event is handled, we can
// wrap our handling code in an anonymous function, and use `defer syncEvent.Done()`.
func() {
// Notifies the dispatcher that the event has been handled.
defer syncEvent.Done()
// The `Event` field contains the event that was dispatched
event := syncEvent.Event
fmt.Println(event.Message)
}()
}
}()NOTE The syncEvent.Done() call is important, as it ensures that the dispatcher will not block indefinitely. This can be called at any point in the handling of the SyncEvent[T], but it's best to defer the call within an anonymous function to ensure that it is called. It's also very important to be weary of potential panics in your handling code (another good reason to use defer). If being cautious in a handler is not possible, and you wish to avoid the potential to block and leak goroutines, you can use DispatchSyncWithTimeout(event T, timeout time.Duration) to dispatch an event which will never block longer than the specified amount of time.
To summarize, if you want to use synchronous dispatch and handling, you would use:
dispatcher.DispatchSync()ordispatcher.DispatchSyncWithTimeout()to dispatch an eventeventStream.SyncStream()to receive the event, and make sure to callsyncEvent.Done()when the event is handled.
If you only want the dispatcher to block until the event is dispatched, you can use:
dispatcher.DispatchSync()ordispatcher.DispatchSyncWithTimeout()to dispatch an eventeventStream.Stream()to receive the event where the event arrives directly on the stream
If you want no blocking behavior, use:
dispatcher.Dispatch()to dispatch an eventeventStream.Stream()to receive the event where the event arrives directly on the stream.
The other possible combination would occur if you used SyncStream() on the EventStream, and then used Dispatch() on the Dispatcher. This behaves similarly to the non-blocking behavior.
This means that no matter what, an event stream will always receive a dispatched event, whether that dispatch was made using Dispatch or DispatchSync.
Full Changelog: v0.0.4...v0.0.6
Version 0.0.5
What's Changed
Synchronous Dispatch and Receiving
While the internals of the dispatching remain asynchronous, the API allows for event dispatchers to block until events are "received." This behavior depends on both the dispatcher and the event handler. If I add an event stream handler like in the previous example:
dispatcher := events.NewDispatcher[MyEvent]()
// typically, a stream will be handled in a separate goroutine
go func() {
stream := dispatcher.NewEventStream()
for event := range stream.Stream() {
fmt.Println(event.Message)
}
}()Then, this receiver will always receive and handle events asynchronously. If you were dispatch an event using the DispatchSync(...) method on Dispatcher, the dispatcher will block until the event is dispatched to the receiver. However, what you wanted the DispatchSync(...) to block until the event is received and handled? For this, you will need to change the range loop in your handler slightly:
go func() {
// This line is the same
stream := dispatcher.NewEventStream()
// We changed the variable name to `syncEvent` and we changed the method from `Stream()` to `SyncStream()`
for syncEvent := range stream.SyncStream() {
// As a convention, to _ensure_ that `syncEvent.Done()` is called when the event is handled, we can
// wrap our handling code in an anonymous function, and use `defer syncEvent.Done()`.
func() {
// Notifies the dispatcher that the event has been handled.
defer syncEvent.Done()
// The `Event` field contains the event that was dispatched
event := syncEvent.Event
fmt.Println(event.Message)
}()
}
}()NOTE The syncEvent.Done() call is important, as it ensures that the dispatcher will not block indefinitely. This can be called at any point in the handling of the SyncEvent[T], but it's best to defer the call within an anonymous function to ensure that it is called. It's also very important to be weary of potential panics in your handling code (another good reason to use defer). If being cautious in a handler is not possible, and you wish to avoid the potential to block and leak goroutines, you can use DispatchSyncWithTimeout(event T, timeout time.Duration) to dispatch an event which will never block longer than the specified amount of time.
To summarize, if you want to use synchronous dispatch and handling, you would use:
dispatcher.DispatchSync()ordispatcher.DispatchSyncWithTimeout()to dispatch an eventeventStream.SyncStream()to receive the event, and make sure to callsyncEvent.Done()when the event is handled.
If you only want the dispatcher to block until the event is dispatched, you can use:
dispatcher.DispatchSync()ordispatcher.DispatchSyncWithTimeout()to dispatch an eventeventStream.Stream()to receive the event where the event arrives directly on the stream
If you want no blocking behavior, use:
dispatcher.Dispatch()to dispatch an eventeventStream.Stream()to receive the event where the event arrives directly on the stream.
The other possible combination would occur if you used SyncStream() on the EventStream, and then used Dispatch() on the Dispatcher. This behaves similarly to the non-blocking behavior.
This means that no matter what, an event stream will always receive a dispatched event, whether that dispatch was made using Dispatch or DispatchSync.
Full Changelog: v0.0.4...v0.0.5
Version 0.0.4
What's Changed
- Added
events.Dispatch[T any](event T)method which will dispatch the provided event via the globalTdispatcher. - Added persistent flag to allow dispatcher instances to persist their processing goroutine after calling
CloseEventStreams(). This is specifically meant to prevent global dispatcher instances from being shutdown.
Full Changelog
Version 0.0.3
What's Changed
- Added Filtered Event Streams to the API for allowing distinct event routes based on custom predicates.
- Added all common use cases to README with better examples.
Full Changelog
Version 0.0.2
Breaking API Changes
DispatcherFor[T]is nowGlobalDispatcherFor[T]- Instanced dispatcher creation via
NewDispatcher[T]
Changes
- Improved some awkward data structure interactions and replaced some unnecessary locking with atomic flag.
- Fixed an issue with tests, running them concurrently or even fast enough would leak previous test data into the next test.