Skip to content

Commit

Permalink
Rename notify to subscriptions (#536)
Browse files Browse the repository at this point in the history
* Rename notify to suscriptions

* Rename suscriptions to subscriptions
  • Loading branch information
avillega authored Jan 23, 2025
1 parent 15ba65c commit bbb848a
Show file tree
Hide file tree
Showing 41 changed files with 850 additions and 842 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/callback.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/notify.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/subscription.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule_t.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/pb/lock.proto
Expand Down
10 changes: 5 additions & 5 deletions api/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ paths:
404:
description: Promise not found

# Notify
/notify:
# Subscriptions
/subscriptions:
post:
tags:
- Notify
summary: Create a Notification
operationId: createNotify
- Subscription
summary: Create a Subscription
operationId: createSubscription
parameters:
- in: header
name: request-id
Expand Down
1 change: 1 addition & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
system.AddOnRequest(t_api.CreateSchedule, coroutines.CreateSchedule)
Expand Down
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

"github.com/resonatehq/resonate/cmd/callbacks"
"github.com/resonatehq/resonate/cmd/dst"
"github.com/resonatehq/resonate/cmd/notify"
"github.com/resonatehq/resonate/cmd/promises"
"github.com/resonatehq/resonate/cmd/quickstart"
"github.com/resonatehq/resonate/cmd/schedules"
"github.com/resonatehq/resonate/cmd/serve"
"github.com/resonatehq/resonate/cmd/subscriptions"
"github.com/resonatehq/resonate/cmd/tasks"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -42,7 +42,7 @@ func init() {
rootCmd.AddCommand(quickstart.NewCmd())
rootCmd.AddCommand(tasks.NewCmd())
rootCmd.AddCommand(callbacks.NewCmd())
rootCmd.AddCommand(notify.NewCmd())
rootCmd.AddCommand(subscriptions.NewCmd())

// Set default output
rootCmd.SetOut(os.Stdout)
Expand Down
2 changes: 1 addition & 1 deletion cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.CreatePromise, coroutines.CreatePromise)
system.AddOnRequest(t_api.CreatePromiseAndTask, coroutines.CreatePromiseAndTask)
system.AddOnRequest(t_api.CreateCallback, coroutines.CreateCallback)
system.AddOnRequest(t_api.CreateNotify, coroutines.CreateNotify)
system.AddOnRequest(t_api.CreateSubscription, coroutines.CreateSubscription)
system.AddOnRequest(t_api.CompletePromise, coroutines.CompletePromise)
system.AddOnRequest(t_api.ReadSchedule, coroutines.ReadSchedule)
system.AddOnRequest(t_api.SearchSchedules, coroutines.SearchSchedules)
Expand Down
30 changes: 15 additions & 15 deletions cmd/notify/create.go → cmd/subscriptions/create.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notify
package subscriptions

import (
"context"
Expand All @@ -12,27 +12,27 @@ import (
"github.com/spf13/cobra"
)

var notifyExample = `
# Create a notify
resonate notify create foo --promise-id bar --timeout 1h --recv default
var subscriptionExample = `
# Create a subscription
resonate subscription create foo --promise-id bar --timeout 1h --recv default
# Create a notify with url
resonate notify create foo --promise-id bar --timeout 1h --recv poll://default/6fa89b7e-4a56-40e8-ba4e-78864caa3278
# Create a subscription with url
resonate subscription create foo --promise-id bar --timeout 1h --recv poll://default/6fa89b7e-4a56-40e8-ba4e-78864caa3278
# Create a notify with object
resonate notify create foo --promise-id bar --timeout 1h --recv {"type": "poll", "data": {"group": "default", "id": "6fa89b7e-4a56-40e8-ba4e-78864caa3278"}}
# Create a subscription with object
resonate subscription create foo --promise-id bar --timeout 1h --recv {"type": "poll", "data": {"group": "default", "id": "6fa89b7e-4a56-40e8-ba4e-78864caa3278"}}
`

func CreateNotifyCmd(c client.Client) *cobra.Command {
func CreateSubscriptionCmd(c client.Client) *cobra.Command {
var (
promiseId string
timeout time.Duration
recvStr string
)
cmd := &cobra.Command{
Use: "create <id>",
Short: "Create notify",
Example: notifyExample,
Short: "Create subscription",
Example: subscriptionExample,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("must specify an id")
Expand All @@ -57,25 +57,25 @@ func CreateNotifyCmd(c client.Client) *cobra.Command {
}
}

body := v1.CreateNotifyJSONRequestBody{
body := v1.CreateSubscriptionJSONRequestBody{
Id: id,
PromiseId: promiseId,
Timeout: time.Now().Add(timeout).UnixMilli(),
Recv: recv,
}

res, err := c.V1().CreateNotifyWithResponse(context.TODO(), nil, body)
res, err := c.V1().CreateSubscriptionWithResponse(context.TODO(), nil, body)
if err != nil {
return err
}

if res.StatusCode() == 201 {
cmd.Printf("Created notification: %s\n", id)
cmd.Printf("Created subscription: %s\n", id)
} else if res.StatusCode() == 200 {
if res.JSON200.Promise != nil && res.JSON200.Promise.State != v1.PromiseStatePENDING {
cmd.Printf("Promise %s already %s\n", promiseId, strings.ToLower(string(res.JSON200.Promise.State)))
} else {
cmd.Printf("Created notification: %s (deduplicated)\n", id)
cmd.Printf("Created subscription: %s (deduplicated)\n", id)
}
} else {
cmd.PrintErrln(res.Status(), string(res.Body))
Expand Down
10 changes: 5 additions & 5 deletions cmd/notify/notify.go → cmd/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package notify
package subscriptions

import (
"github.com/resonatehq/resonate/pkg/client"
Expand All @@ -14,9 +14,9 @@ func NewCmd() *cobra.Command {
)

cmd := &cobra.Command{
Use: "notifications",
Aliases: []string{"notifications"},
Short: "Resonate notifications",
Use: "subscriptions",
Aliases: []string{"subscriptions"},
Short: "Resonate subscriptions",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
},
Expand All @@ -30,7 +30,7 @@ func NewCmd() *cobra.Command {
}

// Add subcommands
cmd.AddCommand(CreateNotifyCmd(c))
cmd.AddCommand(CreateSubscriptionCmd(c))

// Flags
cmd.PersistentFlags().StringVarP(&server, "server", "", "http://localhost:8001", "resonate url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/resonatehq/resonate/pkg/promise"
)

func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreateNotify, "Request kind must be CreateNotify")
func CreateSubscription(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
util.Assert(r.Kind == t_api.CreateSubscription, "Request kind must be CreateSubscription")
var res *t_api.Response

// read the promise to see if it exists
Expand All @@ -27,7 +27,7 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
{
Kind: t_aio.ReadPromise,
ReadPromise: &t_aio.ReadPromiseCommand{
Id: r.CreateNotify.PromiseId,
Id: r.CreateSubscription.PromiseId,
},
},
},
Expand All @@ -54,19 +54,19 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

// If the notify is already created return 200 and an empty notify
// If the subscription is already created return 200 and an empty subscription
var cb *callback.Callback
status := t_api.StatusOK

if p.State == promise.Pending {
mesg := &message.Mesg{
Type: message.Notify,
Root: r.CreateNotify.PromiseId,
Root: r.CreateSubscription.PromiseId,
}

createdOn := c.Time()

callbackId := fmt.Sprintf("%s.%s", r.CreateNotify.PromiseId, r.CreateNotify.Id)
callbackId := fmt.Sprintf("%s.%s", r.CreateSubscription.PromiseId, r.CreateSubscription.Id)
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
Kind: t_aio.Store,
Tags: r.Tags,
Expand All @@ -77,10 +77,10 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
Kind: t_aio.CreateCallback,
CreateCallback: &t_aio.CreateCallbackCommand{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
PromiseId: r.CreateSubscription.PromiseId,
Recv: r.CreateSubscription.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
Timeout: r.CreateSubscription.Timeout,
CreatedOn: createdOn,
},
},
Expand All @@ -90,7 +90,7 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
})

if err != nil {
slog.Error("failed to create notify", "req", r, "err", err)
slog.Error("failed to create subscription", "req", r, "err", err)
return nil, t_api.NewError(t_api.StatusAIOStoreError, err)
}

Expand All @@ -105,19 +105,19 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
status = t_api.StatusCreated
cb = &callback.Callback{
Id: callbackId,
PromiseId: r.CreateNotify.PromiseId,
Recv: r.CreateNotify.Recv,
PromiseId: r.CreateSubscription.PromiseId,
Recv: r.CreateSubscription.Recv,
Mesg: mesg,
Timeout: r.CreateNotify.Timeout,
Timeout: r.CreateSubscription.Timeout,
CreatedOn: createdOn,
}
}
}

res = &t_api.Response{
Kind: t_api.CreateNotify,
Kind: t_api.CreateSubscription,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
CreateSubscription: &t_api.CreateSubscriptionResponse{
// Status could be StatusOk or StatusCreated if the Callback Id was already present
Status: status,
Callback: cb,
Expand All @@ -127,9 +127,9 @@ func CreateNotify(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],

} else {
res = &t_api.Response{
Kind: t_api.CreateNotify,
Kind: t_api.CreateSubscription,
Tags: r.Tags,
CreateNotify: &t_api.CreateNotifyResponse{
CreateSubscription: &t_api.CreateSubscriptionResponse{
Status: t_api.StatusPromiseNotFound,
},
}
Expand Down
4 changes: 2 additions & 2 deletions internal/app/subsystems/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func New(a i_api.API, config *Config) (i_api.Subsystem, error) {
server := grpc.NewServer(grpc.UnaryInterceptor(s.log)) // nosemgrep
pb.RegisterPromisesServer(server, s)
pb.RegisterCallbacksServer(server, s)
pb.RegisterNotifyServer(server, s)
pb.RegisterSubscriptionsServer(server, s)
pb.RegisterSchedulesServer(server, s)
pb.RegisterLocksServer(server, s)
pb.RegisterTasksServer(server, s)
Expand Down Expand Up @@ -76,7 +76,7 @@ func (g *Grpc) Stop() error {
type server struct {
pb.UnimplementedPromisesServer
pb.UnimplementedCallbacksServer
pb.UnimplementedNotifyServer
pb.UnimplementedSubscriptionsServer
pb.UnimplementedSchedulesServer
pb.UnimplementedLocksServer
pb.UnimplementedTasksServer
Expand Down
42 changes: 21 additions & 21 deletions internal/app/subsystems/api/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (

type grpcTest struct {
*test.API
subsystem api.Subsystem
errors chan error
conn *grpc.ClientConn
promises pb.PromisesClient
callbacks pb.CallbacksClient
notify pb.NotifyClient
schedules pb.SchedulesClient
locks pb.LocksClient
tasks pb.TasksClient
subsystem api.Subsystem
errors chan error
conn *grpc.ClientConn
promises pb.PromisesClient
callbacks pb.CallbacksClient
subscriptions pb.SubscriptionsClient
schedules pb.SchedulesClient
locks pb.LocksClient
tasks pb.TasksClient
}

func setup() (*grpcTest, error) {
Expand All @@ -49,16 +49,16 @@ func setup() (*grpcTest, error) {
}

return &grpcTest{
API: api,
subsystem: subsystem,
errors: errors,
conn: conn,
promises: pb.NewPromisesClient(conn),
callbacks: pb.NewCallbacksClient(conn),
notify: pb.NewNotifyClient(conn),
schedules: pb.NewSchedulesClient(conn),
locks: pb.NewLocksClient(conn),
tasks: pb.NewTasksClient(conn),
API: api,
subsystem: subsystem,
errors: errors,
conn: conn,
promises: pb.NewPromisesClient(conn),
callbacks: pb.NewCallbacksClient(conn),
subscriptions: pb.NewSubscriptionsClient(conn),
schedules: pb.NewSchedulesClient(conn),
locks: pb.NewLocksClient(conn),
tasks: pb.NewTasksClient(conn),
}, nil
}

Expand Down Expand Up @@ -110,8 +110,8 @@ func TestGrpc(t *testing.T) {
res, err = grpcTest.promises.CancelPromise(ctx, req)
case *pb.CreateCallbackRequest:
_, err = grpcTest.callbacks.CreateCallback(ctx, req)
case *pb.CreateNotifyRequest:
_, err = grpcTest.notify.CreateNotify(ctx, req)
case *pb.CreateSubscriptionRequest:
_, err = grpcTest.subscriptions.CreateSubscription(ctx, req)
case *pb.ReadScheduleRequest:
_, err = grpcTest.schedules.ReadSchedule(ctx, req)
case *pb.SearchSchedulesRequest:
Expand Down
2 changes: 1 addition & 1 deletion internal/app/subsystems/api/grpc/pb/callback.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/app/subsystems/api/grpc/pb/callback_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/app/subsystems/api/grpc/pb/callback_t.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/app/subsystems/api/grpc/pb/lock.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/app/subsystems/api/grpc/pb/lock_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bbb848a

Please sign in to comment.