Skip to content

Commit

Permalink
feat(oops): add queuing subsystem to the __other__ dst too (#271)
Browse files Browse the repository at this point in the history
Co-authored-by: Gabriel Guerra <[email protected]>
  • Loading branch information
guergabo and Gabriel Guerra authored Apr 3, 2024
1 parent 0bc15b1 commit afae516
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 36 deletions.
6 changes: 6 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/kernel/system"
"github.com/resonatehq/resonate/internal/kernel/t_aio"
"github.com/resonatehq/resonate/internal/kernel/t_api"
Expand Down Expand Up @@ -93,10 +94,15 @@ func RunDSTCmd() *cobra.Command {
if err != nil {
return err
}
queuing, err := queuing.NewDST()
if err != nil {
return err
}

// add api subsystems
aio.AddSubsystem(t_aio.Network, network)
aio.AddSubsystem(t_aio.Store, store)
aio.AddSubsystem(t_aio.Queuing, queuing)

// start api/aio
if err := api.Start(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func ServeCmd() *cobra.Command {
if err != nil {
return err
}
queuing, err := queuing.NewSubsytemOrDie(config.API.BaseURL, config.AIO.Subsystems.Queuing.Config)
queuing, err := queuing.New(config.API.BaseURL, config.AIO.Subsystems.Queuing.Config)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/app/subsystems/aio/queuing/queuing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type QueuingSubsystem struct {
stop context.CancelFunc
}

// NewSubsytemOrDie creates a new queuing subsystem with the given config.
func NewSubsytemOrDie(baseURL string, config *Config) (*QueuingSubsystem, error) {
// New creates a new queuing subsystem with the given config.
func New(baseURL string, config *Config) (*QueuingSubsystem, error) {
var (
conns = make(map[string]t_conn.Connection, len(config.Connections))
connSQ = make(map[string]chan *t_conn.ConnectionSubmission, len(config.Connections))
Expand Down
46 changes: 46 additions & 0 deletions internal/app/subsystems/aio/queuing/queuing_dst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package queuing

import (
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"
)

// NewDST is a simple helper functions that wraps New and returns a pre-configured QueuingSubsystem.
// This configurations aligns with the DST tests. Search for: 'id = fmt.Sprintf("/gpu/summarize/%s", id)'
// TOOD: make a real DST one that just mocks network calls.
func NewDST() (*QueuingSubsystem, error) {
queuing, err := New("http://localhost:8001", &Config{
Connections: []*t_conn.ConnectionConfig{
{
Kind: t_conn.HTTP,
Name: "summarize",
Metadata: &metadata.Metadata{
Properties: map[string]interface{}{
"url": "http://localhost:5001",
},
},
},
},
Routes: []*t_route.RoutingConfig{
{
Kind: t_route.Pattern,
Name: "default",
Target: &t_route.Target{
Connection: "summarize",
Queue: "analytics",
},
Metadata: &metadata.Metadata{
Properties: map[string]interface{}{
"pattern": "/gpu/summarize/*",
},
},
},
},
})
if err != nil {
return nil, err
}

return queuing, nil
}
34 changes: 1 addition & 33 deletions test/dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn"
queuing_metadata "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/metadata"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/routes/t_route"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite"
"github.com/resonatehq/resonate/internal/kernel/system"
Expand Down Expand Up @@ -47,36 +44,7 @@ func TestDST(t *testing.T) {
if err != nil {
t.Fatal(err)
}

// TODO: DST version so just test the kernel.
queuing, err := queuing.NewSubsytemOrDie("http://localhost:8001", &queuing.Config{
Connections: []*t_conn.ConnectionConfig{
{
Kind: t_conn.HTTP,
Name: "summarize",
Metadata: &queuing_metadata.Metadata{
Properties: map[string]interface{}{
"url": "http://localhost:5001",
},
},
},
},
Routes: []*t_route.RoutingConfig{
{
Kind: t_route.Pattern,
Name: "default",
Target: &t_route.Target{
Connection: "summarize",
Queue: "analytics",
},
Metadata: &queuing_metadata.Metadata{
Properties: map[string]interface{}{
"pattern": "/gpu/summarize/*",
},
},
},
},
})
queuing, err := queuing.NewDST()
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions test/dst/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (g *Generator) GenerateSearchPromises(r *rand.Rand, t int64) *t_api.Request
func (g *Generator) GenerateCreatePromise(r *rand.Rand, t int64) *t_api.Request {
id := g.idSet[r.Intn(len(g.idSet))]

// Triggers the task framework.
if RandBool(r) {
id = fmt.Sprintf("/gpu/summarize/%s", id)
}
Expand Down

0 comments on commit afae516

Please sign in to comment.