Skip to content

Commit

Permalink
Add notifications api (#531)
Browse files Browse the repository at this point in the history
* Add notifications api

* Add basic api tests

* Add notify command to resonate cli

* Add notify cmd

* Add promise to task message

* Send Notify at most once + DST

* Apply suggestions from code review

Co-authored-by: David Farr <[email protected]>

---------

Co-authored-by: David Farr <[email protected]>
  • Loading branch information
avillega and dfarr authored Jan 22, 2025
1 parent 89c1a45 commit de5339e
Show file tree
Hide file tree
Showing 29 changed files with 1,480 additions and 13 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/notify.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/lock.proto
Expand Down
58 changes: 58 additions & 0 deletions api/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,64 @@ paths:
404:
description: Promise not found

# Notify
/notify:
post:
tags:
- Notify
summary: Create a Notification
operationId: createNotify
parameters:
- in: header
name: request-id
description: Unique tracking id
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- id
- promiseId
- timeout
- recv
properties:
id:
type: string
promiseId:
type: string
timeout:
type: integer
format: int64
recv:
$ref: "#/components/schemas/Recv"
responses:
200:
description: OK
content:
application/json:
schema:
type: object
properties:
promise:
$ref: "#/components/schemas/Promise"
201:
description: Created
content:
application/json:
schema:
type: object
properties:
callback:
$ref: "#/components/schemas/Callback"
promise:
$ref: "#/components/schemas/Promise"
404:
description: Promise not found

# Schedules
/schedules:
get:
Expand Down
96 changes: 96 additions & 0 deletions cmd/notify/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package notify

import (
"context"
"encoding/json"
"errors"
"strings"
"time"

"github.com/resonatehq/resonate/pkg/client"
v1 "github.com/resonatehq/resonate/pkg/client/v1"
"github.com/spf13/cobra"
)

var notifyExample = `
# Create a notify
resonate notify create foo --promise-id bar --timeout 1h --recv default
# Create a notify with url
resonate notify create foo --promise-id bar --timeout 1h --recv poll://default/6fa89b7e-4a56-40e8-ba4e-78864caa3278
# Create a notify with object
resonate notify create foo --promise-id bar --timeout 1h --recv {"type": "poll", "data": {"group": "default", "id": "6fa89b7e-4a56-40e8-ba4e-78864caa3278"}}
`

func CreateNotifyCmd(c client.Client) *cobra.Command {
var (
promiseId string
timeout time.Duration
recvStr string
)
cmd := &cobra.Command{
Use: "create <id>",
Short: "Create notify",
Example: notifyExample,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("must specify an id")
}

id := args[0]

var recv v1.Recv

if json.Valid([]byte(recvStr)) {
var recv0 v1.Recv0

if err := json.Unmarshal([]byte(recvStr), &recv0); err != nil {
return err
}
if err := recv.FromRecv0(recv0); err != nil {
return err
}
} else {
if err := recv.FromRecv1(recvStr); err != nil {
return err
}
}

body := v1.CreateNotifyJSONRequestBody{
Id: id,
PromiseId: promiseId,
Timeout: time.Now().Add(timeout).UnixMilli(),
Recv: recv,
}

res, err := c.V1().CreateNotifyWithResponse(context.TODO(), nil, body)
if err != nil {
return err
}

if res.StatusCode() == 201 {
cmd.Printf("Created notification: %s\n", id)
} else if res.StatusCode() == 200 {
if res.JSON200.Promise != nil && res.JSON200.Promise.State != v1.PromiseStatePENDING {
cmd.Printf("Promise %s already %s\n", promiseId, strings.ToLower(string(res.JSON200.Promise.State)))
} else {
cmd.Printf("Created notification: %s (deduplicated)\n", id)
}
} else {
cmd.PrintErrln(res.Status(), string(res.Body))
}

return nil
},
}

cmd.Flags().StringVar(&promiseId, "promise-id", "", "promise id")
cmd.Flags().DurationVar(&timeout, "timeout", 0, "task timeout")
cmd.Flags().StringVar(&recvStr, "recv", "default", "task receiver")

_ = cmd.MarkFlagRequired("promise-id")
_ = cmd.MarkFlagRequired("timeout")

return cmd
}
41 changes: 41 additions & 0 deletions cmd/notify/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package notify

import (
"github.com/resonatehq/resonate/pkg/client"
"github.com/spf13/cobra"
)

func NewCmd() *cobra.Command {
var (
c = client.New()
server string
username string
password string
)

cmd := &cobra.Command{
Use: "notifications",
Aliases: []string{"notifications"},
Short: "Resonate notifications",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
},
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if username != "" || password != "" {
c.SetBasicAuth(username, password)
}

return c.Setup(server)
},
}

// Add subcommands
cmd.AddCommand(CreateNotifyCmd(c))

// Flags
cmd.PersistentFlags().StringVarP(&server, "server", "", "http://localhost:8001", "resonate url")
cmd.PersistentFlags().StringVarP(&username, "username", "U", "", "basic auth username")
cmd.PersistentFlags().StringVarP(&password, "password", "P", "", "basic auth password")

return cmd
}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/resonatehq/resonate/cmd/callbacks"
"github.com/resonatehq/resonate/cmd/dst"
"github.com/resonatehq/resonate/cmd/notify"
"github.com/resonatehq/resonate/cmd/promises"
"github.com/resonatehq/resonate/cmd/quickstart"
"github.com/resonatehq/resonate/cmd/schedules"
Expand Down Expand Up @@ -41,6 +42,7 @@ func init() {
rootCmd.AddCommand(quickstart.NewCmd())
rootCmd.AddCommand(tasks.NewCmd())
rootCmd.AddCommand(callbacks.NewCmd())
rootCmd.AddCommand(notify.NewCmd())

// Set default output
rootCmd.SetOut(os.Stdout)
Expand Down
1 change: 1 addition & 0 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.CreateNotify, coroutines.CreateNotify)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
Expand Down
140 changes: 140 additions & 0 deletions internal/app/coroutines/createNotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/callback"
"github.com/resonatehq/resonate/pkg/message"
"github.com/resonatehq/resonate/pkg/promise"
)

func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreateNotify, "Request kind must be CreateNotify")
var res *t_api.Response

// read the promise to see if it exists
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Store: &t_aio.StoreSubmission{
Transaction: &t_aio.Transaction{
Commands: []*t_aio.Command{
{
Kind: t_aio.ReadPromise,
ReadPromise: &t_aio.ReadPromiseCommand{
Id: r.CreateNotify.PromiseId,
},
},
},
},
},
})

if err != nil {
slog.Error("failed to read promise", "req", r, "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 1, "completion must have one result")

result := completion.Store.Results[0].ReadPromise
util.Assert(result != nil, "result must not be nil")
util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows")

if result.RowsReturned == 1 {
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

// If the notify is already created return 200 and an empty notify
var cb *callback.Callback
status := t_api.StatusOK

if p.State == promise.Pending {
mesg := &message.Mesg{
Type: message.Notify,
Root: r.CreateNotify.PromiseId,
}

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateNotify.PromiseId, r.CreateNotify.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Store: &t_aio.StoreSubmission{
Transaction: &t_aio.Transaction{
Commands: []*t_aio.Command{
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
CreatedOn: createdOn,
},
},
},
},
},
})

if err != nil {
slog.Error("failed to create notify", "req", r, "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 1, "completion must have one result")

result := completion.Store.Results[0].CreateCallback
util.Assert(result != nil, "result must not be nil")
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
CreatedOn: createdOn,
}
}
}

res = &t_api.Response{
Kind: t_api.CreateNotify,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
// Status could be StatusOk or StatusCreated if the Callback Id was already present
Status: status,
Callback: cb,
Promise: p,
},
}

} else {
res = &t_api.Response{
Kind: t_api.CreateNotify,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
Status: t_api.StatusPromiseNotFound,
},
}
}

util.Assert(res != nil, "response must not be nil")
return res, nil
}
Loading

0 comments on commit de5339e

Please sign in to comment.