Skip to content

Commit

Permalink
[fix]: Resolve string receiver in sender subsystem (#467) (#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
TusharMohapatra07 authored Dec 17, 2024
1 parent cc53141 commit 9e066cd
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
37 changes: 0 additions & 37 deletions internal/app/subsystems/aio/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net/url"
"strconv"
"strings"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/kernel/bus"
Expand Down Expand Up @@ -249,45 +247,10 @@ func coerce(v any) (any, bool) {
case *receiver.Recv:
return v, true
case string:
if recv, ok := schemeToRecv(v); ok {
return recv, true
}

// will be matched against logical receivers in the queueing
// subsystem
return v, true
default:
return nil, false
}
}

func schemeToRecv(v string) (*receiver.Recv, bool) {
u, err := url.Parse(v)
if err != nil {
return nil, false
}

switch u.Scheme {
case "http", "https":
data, err := json.Marshal(map[string]string{"url": u.String()})
if err != nil {
return nil, false
}

return &receiver.Recv{Type: "http", Data: data}, true
case "poll":
rawData := map[string]string{"group": u.Host}
if id := strings.TrimPrefix(u.Path, "/"); id != "" {
rawData["id"] = id
}

data, err := json.Marshal(rawData)
if err != nil {
return nil, false
}

return &receiver.Recv{Type: "poll", Data: data}, true
default:
return nil, false
}
}
37 changes: 37 additions & 0 deletions internal/app/subsystems/aio/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net/url"
"strings"

"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/plugins/http"
Expand Down Expand Up @@ -219,8 +221,12 @@ func (w *SenderWorker) Process(sqe *bus.SQE[t_aio.Submission, t_aio.Completion])
util.Assert((logicalRecv != nil) != (physicalRecv != nil), "one of logical or physical recv must be nil, but not both")

var recv *receiver.Recv

if logicalRecv != nil {
recv = w.targets[*logicalRecv]
if recv == nil {
recv, _ = schemeToRecv(*logicalRecv)
}
} else {
recv = physicalRecv
}
Expand Down Expand Up @@ -291,3 +297,34 @@ func boolToStatus(b bool) string {
return "failure"
}
}

func schemeToRecv(v string) (*receiver.Recv, bool) {
u, err := url.Parse(v)
if err != nil {
return nil, false
}

switch u.Scheme {
case "http", "https":
data, err := json.Marshal(map[string]string{"url": u.String()})
if err != nil {
return nil, false
}

return &receiver.Recv{Type: "http", Data: data}, true
case "poll":
rawData := map[string]string{"group": u.Host}
if id := strings.TrimPrefix(u.Path, "/"); id != "" {
rawData["id"] = id
}

data, err := json.Marshal(rawData)
if err != nil {
return nil, false
}

return &receiver.Recv{Type: "poll", Data: data}, true
default:
return nil, false
}
}

0 comments on commit 9e066cd

Please sign in to comment.