Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notifications api #531

Merged
merged 7 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/notify.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/lock.proto
Expand Down
58 changes: 58 additions & 0 deletions api/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,64 @@ paths:
404:
description: Promise not found

# Notify
/notify:
post:
tags:
- Notify
summary: Create a Notification
operationId: createNotify
parameters:
- in: header
name: request-id
description: Unique tracking id
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- id
- promiseId
- timeout
- recv
properties:
id:
type: string
promiseId:
type: string
timeout:
type: integer
format: int64
recv:
$ref: "#/components/schemas/Recv"
responses:
200:
description: OK
content:
application/json:
schema:
type: object
properties:
promise:
$ref: "#/components/schemas/Promise"
201:
description: Created
content:
application/json:
schema:
type: object
properties:
callback:
$ref: "#/components/schemas/Callback"
promise:
$ref: "#/components/schemas/Promise"
404:
description: Promise not found

# Schedules
/schedules:
get:
Expand Down
96 changes: 96 additions & 0 deletions cmd/notify/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package notify

import (
"context"
"encoding/json"
"errors"
"strings"
"time"

"github.com/resonatehq/resonate/pkg/client"
v1 "github.com/resonatehq/resonate/pkg/client/v1"
"github.com/spf13/cobra"
)

var notifyExample = `
# Create a notify
resonate notify create foo --promise-id bar --timeout 1h --recv default

# Create a notify with url
resonate notify create foo --promise-id bar --timeout 1h --recv poll://default/6fa89b7e-4a56-40e8-ba4e-78864caa3278

# Create a notify with object
resonate notify create foo --promise-id bar --timeout 1h --recv {"type": "poll", "data": {"group": "default", "id": "6fa89b7e-4a56-40e8-ba4e-78864caa3278"}}
`

func CreateNotifyCmd(c client.Client) *cobra.Command {
var (
promiseId string
timeout time.Duration
recvStr string
)
cmd := &cobra.Command{
Use: "create <id>",
Short: "Create notify",
Example: notifyExample,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("must specify an id")
}

Check warning on line 39 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L26-L39

Added lines #L26 - L39 were not covered by tests

id := args[0]

var recv v1.Recv

if json.Valid([]byte(recvStr)) {
var recv0 v1.Recv0

if err := json.Unmarshal([]byte(recvStr), &recv0); err != nil {
return err
}
if err := recv.FromRecv0(recv0); err != nil {
return err
}
} else {
if err := recv.FromRecv1(recvStr); err != nil {
return err
}

Check warning on line 57 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L41-L57

Added lines #L41 - L57 were not covered by tests
}

body := v1.CreateNotifyJSONRequestBody{
Id: id,
PromiseId: promiseId,
Timeout: time.Now().Add(timeout).UnixMilli(),
Recv: recv,
}

res, err := c.V1().CreateNotifyWithResponse(context.TODO(), nil, body)
if err != nil {
return err
}

Check warning on line 70 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L60-L70

Added lines #L60 - L70 were not covered by tests

if res.StatusCode() == 201 {
cmd.Printf("Created notification: %s\n", id)
} else if res.StatusCode() == 200 {
if res.JSON200.Promise != nil && res.JSON200.Promise.State != v1.PromiseStatePENDING {
cmd.Printf("Promise %s already %s\n", promiseId, strings.ToLower(string(res.JSON200.Promise.State)))
} else {
cmd.Printf("Created notification: %s (deduplicated)\n", id)
}
} else {
cmd.PrintErrln(res.Status(), string(res.Body))
}

Check warning on line 82 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L72-L82

Added lines #L72 - L82 were not covered by tests

return nil

Check warning on line 84 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L84

Added line #L84 was not covered by tests
},
}

cmd.Flags().StringVar(&promiseId, "promise-id", "", "promise id")
cmd.Flags().DurationVar(&timeout, "timeout", 0, "task timeout")
cmd.Flags().StringVar(&recvStr, "recv", "default", "task receiver")

_ = cmd.MarkFlagRequired("promise-id")
_ = cmd.MarkFlagRequired("timeout")

return cmd

Check warning on line 95 in cmd/notify/create.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/create.go#L88-L95

Added lines #L88 - L95 were not covered by tests
}
41 changes: 41 additions & 0 deletions cmd/notify/notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package notify

import (
"github.com/resonatehq/resonate/pkg/client"
"github.com/spf13/cobra"
)

func NewCmd() *cobra.Command {
var (
c = client.New()
server string
username string
password string
)

cmd := &cobra.Command{
Use: "notifications",
Aliases: []string{"notifications"},
Short: "Resonate notifications",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
},
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if username != "" || password != "" {
c.SetBasicAuth(username, password)
}

Check warning on line 26 in cmd/notify/notify.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/notify.go#L8-L26

Added lines #L8 - L26 were not covered by tests

return c.Setup(server)

Check warning on line 28 in cmd/notify/notify.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/notify.go#L28

Added line #L28 was not covered by tests
},
}

// Add subcommands
cmd.AddCommand(CreateNotifyCmd(c))

// Flags
cmd.PersistentFlags().StringVarP(&server, "server", "", "http://localhost:8001", "resonate url")
cmd.PersistentFlags().StringVarP(&username, "username", "U", "", "basic auth username")
cmd.PersistentFlags().StringVarP(&password, "password", "P", "", "basic auth password")

return cmd

Check warning on line 40 in cmd/notify/notify.go

View check run for this annotation

Codecov / codecov/patch

cmd/notify/notify.go#L33-L40

Added lines #L33 - L40 were not covered by tests
}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/resonatehq/resonate/cmd/callbacks"
"github.com/resonatehq/resonate/cmd/dst"
"github.com/resonatehq/resonate/cmd/notify"
"github.com/resonatehq/resonate/cmd/promises"
"github.com/resonatehq/resonate/cmd/quickstart"
"github.com/resonatehq/resonate/cmd/schedules"
Expand Down Expand Up @@ -41,6 +42,7 @@ func init() {
rootCmd.AddCommand(quickstart.NewCmd())
rootCmd.AddCommand(tasks.NewCmd())
rootCmd.AddCommand(callbacks.NewCmd())
rootCmd.AddCommand(notify.NewCmd())

// Set default output
rootCmd.SetOut(os.Stdout)
Expand Down
1 change: 1 addition & 0 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.CreateNotify, coroutines.CreateNotify)

Check warning on line 98 in cmd/serve/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/serve/serve.go#L98

Added line #L98 was not covered by tests
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
Expand Down
140 changes: 140 additions & 0 deletions internal/app/coroutines/createNotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package coroutines

import (
"fmt"
"log/slog"

"github.com/resonatehq/gocoro"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
"github.com/resonatehq/resonate/internal/util"
"github.com/resonatehq/resonate/pkg/callback"
"github.com/resonatehq/resonate/pkg/message"
"github.com/resonatehq/resonate/pkg/promise"
)

func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreateNotify, "Request kind must be CreateNotify")
var res *t_api.Response

// read the promise to see if it exists
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Store: &t_aio.StoreSubmission{
Transaction: &t_aio.Transaction{
Commands: []*t_aio.Command{
{
Kind: t_aio.ReadPromise,
ReadPromise: &t_aio.ReadPromiseCommand{
Id: r.CreateNotify.PromiseId,
},
},
},
},
},
})

if err != nil {
slog.Error("failed to read promise", "req", r, "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

Check warning on line 41 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L16-L41

Added lines #L16 - L41 were not covered by tests

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 1, "completion must have one result")

result := completion.Store.Results[0].ReadPromise
util.Assert(result != nil, "result must not be nil")
util.Assert(result.RowsReturned == 0 || result.RowsReturned == 1, "result must return 0 or 1 rows")

if result.RowsReturned == 1 {
p, err := result.Records[0].Promise()
if err != nil {
slog.Error("failed to parse promise record", "record", result.Records[0], "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

Check warning on line 55 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L43-L55

Added lines #L43 - L55 were not covered by tests

// If the notify is already created return 200 and an empty notify
var cb *callback.Callback
status := t_api.StatusOK

if p.State == promise.Pending {
mesg := &message.Mesg{
Type: message.Notify,
Root: r.CreateNotify.PromiseId,
}

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateNotify.PromiseId, r.CreateNotify.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Store: &t_aio.StoreSubmission{
Transaction: &t_aio.Transaction{
Commands: []*t_aio.Command{
{
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
CreatedOn: createdOn,
},
},
},
},
},
})

if err != nil {
slog.Error("failed to create notify", "req", r, "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

Check warning on line 95 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L58-L95

Added lines #L58 - L95 were not covered by tests

util.Assert(completion.Store != nil, "completion must not be nil")
util.Assert(len(completion.Store.Results) == 1, "completion must have one result")

result := completion.Store.Results[0].CreateCallback
util.Assert(result != nil, "result must not be nil")
util.Assert(result.RowsAffected == 0 || result.RowsAffected == 1, "result must return 0 or 1 rows")

if result.RowsAffected == 1 {
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
CreatedOn: createdOn,
}
}

Check warning on line 114 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L97-L114

Added lines #L97 - L114 were not covered by tests
}

res = &t_api.Response{
Kind: t_api.CreateNotify,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
// Status could be StatusOk or StatusCreated if the Callback Id was already present
Status: status,
Callback: cb,
Promise: p,
},
}

Check warning on line 126 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L117-L126

Added lines #L117 - L126 were not covered by tests

} else {
res = &t_api.Response{
Kind: t_api.CreateNotify,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
Status: t_api.StatusPromiseNotFound,
},
}
}

Check warning on line 136 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L128-L136

Added lines #L128 - L136 were not covered by tests

util.Assert(res != nil, "response must not be nil")
return res, nil

Check warning on line 139 in internal/app/coroutines/createNotify.go

View check run for this annotation

Codecov / codecov/patch

internal/app/coroutines/createNotify.go#L138-L139

Added lines #L138 - L139 were not covered by tests
}
Loading
Loading