Skip to content

Commit

Permalink
Merge branch 'main' into receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
TusharMohapatra07 committed Dec 14, 2024
2 parents 3066fdc + cc53141 commit 55fbaa4
Show file tree
Hide file tree
Showing 22 changed files with 176 additions and 1,169 deletions.
94 changes: 0 additions & 94 deletions api/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -231,100 +231,6 @@ paths:
409:
description: Promise already exists

/promises/callback:
post:
tags:
- Promises
summary: Create promise and callback
operationId: createPromiseAndCallback
parameters:
- in: header
name: request-id
description: Unique tracking id
schema:
type: string
- in: header
name: idempotency-key
description: Deduplicates requests
schema:
type: string
- in: header
name: strict
description: If true, deduplicates only when promise state matches the request
schema:
type: boolean
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- promise
- callback
properties:
promise:
type: object
required:
- id
- timeout
properties:
id:
type: string
timeout:
type: integer
format: int64
param:
$ref: "#/components/schemas/Value"
tags:
type: object
additionalProperties:
type: string
callback:
type: object
required:
- id
- rootPromiseId
- timeout
- recv
properties:
id:
type: string
rootPromiseId:
type: string
timeout:
type: integer
format: int64
recv:
$ref: "#/components/schemas/Recv"
responses:
200:
description: Operation deduplicated, promise fetched successfully
content:
application/json:
schema:
type: object
properties:
promise:
$ref: "#/components/schemas/Promise"
201:
description: Created
content:
application/json:
schema:
type: object
properties:
promise:
$ref: "#/components/schemas/Promise"
callback:
$ref: "#/components/schemas/Callback"
400:
description: Invalid request
403:
description: Forbidden request
409:
description: Promise already exists

/promises/{id}:
get:
tags:
Expand Down
1 change: 0 additions & 1 deletion cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreatePromiseAndCallback, coroutines.CreatePromiseAndCallback)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
var rootCmd = &cobra.Command{
Use: "resonate",
Short: "Resonate: distributed async await",
Version: "0.6.0", // This needs to be bumped when new versions are released.
Version: "0.7.0", // This needs to be bumped when new versions are released.
}

func init() {
Expand Down
1 change: 0 additions & 1 deletion cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.SearchPromises, coroutines.SearchPromises)
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreatePromiseAndCallback, coroutines.CreatePromiseAndCallback)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
Expand Down
49 changes: 6 additions & 43 deletions internal/app/coroutines/createPromise.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/callback"
"github.com/resonatehq/resonate/pkg/message"
"github.com/resonatehq/resonate/pkg/promise"
"github.com/resonatehq/resonate/pkg/task"
Expand All @@ -17,15 +15,15 @@ import (
func CreatePromise(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromise, "must be create promise")

return createPromiseAndTaskOrCallback(c, r, r.CreatePromise)
return createPromiseAndTask(c, r, r.CreatePromise)
}

func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromiseAndTask, "must be create promise and task")
util.Assert(r.CreatePromiseAndTask.Promise.Id == r.CreatePromiseAndTask.Task.PromiseId, "promise ids must match")
util.Assert(r.CreatePromiseAndTask.Promise.Timeout == r.CreatePromiseAndTask.Task.Timeout, "timeouts must match")

return createPromiseAndTaskOrCallback(c, r, r.CreatePromiseAndTask.Promise, &t_aio.Command{
return createPromiseAndTask(c, r, r.CreatePromiseAndTask.Promise, &t_aio.Command{
Kind: t_aio.CreateTask,
CreateTask: &t_aio.CreateTaskCommand{
Recv: r.CreatePromiseAndTask.Task.Recv,
Expand All @@ -40,38 +38,20 @@ func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completio
})
}

func CreatePromiseAndCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromiseAndCallback, "must be create promise and callback")
util.Assert(r.CreatePromiseAndCallback.Promise.Id == r.CreatePromiseAndCallback.Callback.PromiseId, "promise ids must match")

return createPromiseAndTaskOrCallback(c, r, r.CreatePromiseAndCallback.Promise, &t_aio.Command{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: fmt.Sprintf("%s.%s", r.CreatePromiseAndCallback.Promise.Id, r.CreatePromiseAndCallback.Callback.Id),
PromiseId: r.CreatePromiseAndCallback.Callback.PromiseId,
Recv: r.CreatePromiseAndCallback.Callback.Recv,
Mesg: &message.Mesg{Type: message.Resume, Root: r.CreatePromiseAndCallback.Callback.RootPromiseId, Leaf: r.CreatePromiseAndCallback.Callback.PromiseId},
Timeout: r.CreatePromiseAndCallback.Callback.Timeout,
CreatedOn: c.Time(),
},
})
}

func createPromiseAndTaskOrCallback(
func createPromiseAndTask(
c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
r *t_api.Request,
createPromiseReq *t_api.CreatePromiseRequest,
additionalCmds ...*t_aio.Command,
) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreatePromise || r.Kind == t_api.CreatePromiseAndTask || r.Kind == t_api.CreatePromiseAndCallback, "must be create promise or variant")
util.Assert(r.Kind == t_api.CreatePromise || r.Kind == t_api.CreatePromiseAndTask, "must be create promise or variant")

// response status
var status t_api.StatusCode

// response data
var p *promise.Promise
var t *task.Task
var cb *callback.Callback

// first read the promise to see if it already exists
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Expand Down Expand Up @@ -120,7 +100,7 @@ func createPromiseAndTaskOrCallback(
if completion.Store.Results[0].CreatePromise.RowsAffected == 0 {
// It's possible that the promise was created by another coroutine
// while we were creating. In that case, we should just retry.
return createPromiseAndTaskOrCallback(c, r, createPromiseReq, additionalCmds...)
return createPromiseAndTask(c, r, createPromiseReq, additionalCmds...)
}

// set status
Expand Down Expand Up @@ -156,21 +136,6 @@ func createPromiseAndTaskOrCallback(
ExpiresAt: cmd.ExpiresAt,
CreatedOn: &cmd.CreatedOn,
}
case t_api.CreatePromiseAndCallback:
util.Assert(additionalCmds[0].Kind == t_aio.CreateCallback, "command must be create callback")
util.Assert(completion.Store.Results[1].Kind == t_aio.CreateCallback, "completion must be create callback")
cmd := additionalCmds[0].CreateCallback

if completion.Store.Results[1].CreateCallback.RowsAffected != 0 {
cb = &callback.Callback{
Id: cmd.Id,
PromiseId: cmd.PromiseId,
Recv: cmd.Recv,
Mesg: cmd.Mesg,
Timeout: cmd.Timeout,
CreatedOn: cmd.CreatedOn,
}
}
}
} else {
p, err = result.Records[0].Promise()
Expand All @@ -196,7 +161,7 @@ func createPromiseAndTaskOrCallback(
if !ok {
// It's possible that the promise was created by another coroutine
// while we were timing out. In that case, we should just retry.
return createPromiseAndTaskOrCallback(c, r, createPromiseReq, additionalCmds...)
return createPromiseAndTask(c, r, createPromiseReq, additionalCmds...)
}

// set status to ok if not strict and idempotency keys match
Expand Down Expand Up @@ -226,8 +191,6 @@ func createPromiseAndTaskOrCallback(
res.CreatePromise = &t_api.CreatePromiseResponse{Status: status, Promise: p}
case t_api.CreatePromiseAndTask:
res.CreatePromiseAndTask = &t_api.CreatePromiseAndTaskResponse{Status: status, Promise: p, Task: t}
case t_api.CreatePromiseAndCallback:
res.CreatePromiseAndCallback = &t_api.CreatePromiseAndCallbackResponse{Status: status, Promise: p, Callback: cb}
}

return res, nil
Expand Down
2 changes: 0 additions & 2 deletions internal/app/subsystems/api/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ func TestGrpc(t *testing.T) {
res, err = grpcTest.promises.CreatePromise(ctx, req)
case *pb.CreatePromiseAndTaskRequest:
res, err = grpcTest.promises.CreatePromiseAndTask(ctx, req)
case *pb.CreatePromiseAndCallbackRequest:
res, err = grpcTest.promises.CreatePromiseAndCallback(ctx, req)
case *pb.ResolvePromiseRequest:
res, err = grpcTest.promises.ResolvePromise(ctx, req)
case *pb.RejectPromiseRequest:
Expand Down
Loading

0 comments on commit 55fbaa4

Please sign in to comment.