Skip to content

Commit

Permalink
fix: ctx in stream callback handlers (#88)
Browse files Browse the repository at this point in the history
Change-Id: Ic4a3d2e0a22493ba389bd9db5a841977986b0eef
  • Loading branch information
shentongmartin authored Feb 26, 2025
1 parent 706ccb8 commit b769ad2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions compose/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func genericOnStartWithStreamInputHandle(ctx context.Context, input streamReader

cpy := input.copy

handle := func(handler icb.Handler, in streamReader) context.Context {
handle := func(ctx context.Context, handler icb.Handler, in streamReader) context.Context {
in_, ok := unpackStreamReader[icb.CallbackInput](in)
if !ok {
panic("impossible")
Expand All @@ -139,7 +139,7 @@ func genericOnEndWithStreamOutputHandle(ctx context.Context, output streamReader

cpy := output.copy

handle := func(handler icb.Handler, out streamReader) context.Context {
handle := func(ctx context.Context, handler icb.Handler, out streamReader) context.Context {
out_, ok := unpackStreamReader[icb.CallbackOutput](out)
if !ok {
panic("impossible")
Expand Down
8 changes: 4 additions & 4 deletions internal/callbacks/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func OnWithStreamHandle[S any](
inOut S,
handlers []Handler,
cpy func(int) []S,
handle func(Handler, S) context.Context) (context.Context, S) {
handle func(context.Context, Handler, S) context.Context) (context.Context, S) {

if len(handlers) == 0 {
return ctx, inOut
Expand All @@ -104,7 +104,7 @@ func OnWithStreamHandle[S any](
inOuts := cpy(len(handlers) + 1)

for i, handler := range handlers {
ctx = handle(handler, inOuts[i])
ctx = handle(ctx, handler, inOuts[i])
}

return ctx, inOuts[len(inOuts)-1]
Expand All @@ -117,7 +117,7 @@ func OnStartWithStreamInputHandle[T any](ctx context.Context, input *schema.Stre

cpy := input.Copy

handle := func(handler Handler, in *schema.StreamReader[T]) context.Context {
handle := func(ctx context.Context, handler Handler, in *schema.StreamReader[T]) context.Context {
in_ := schema.StreamReaderWithConvert(in, func(i T) (CallbackInput, error) {
return i, nil
})
Expand All @@ -132,7 +132,7 @@ func OnEndWithStreamOutputHandle[T any](ctx context.Context, output *schema.Stre

cpy := output.Copy

handle := func(handler Handler, out *schema.StreamReader[T]) context.Context {
handle := func(ctx context.Context, handler Handler, out *schema.StreamReader[T]) context.Context {
out_ := schema.StreamReaderWithConvert(out, func(i T) (CallbackOutput, error) {
return i, nil
})
Expand Down

0 comments on commit b769ad2

Please sign in to comment.