Skip to content

Commit 28808be

Browse files
authored
[API-291] Migrate /comms/mutate to bridge (#337)
Our story begins with [this endpoint implementation](https://github.com/AudiusProject/audius-protocol/blob/ab69d7c319b4bdfc4f965547d5f6418d9568dda8/comms/discovery/server/server.go#L133) in the protocol `comms` server... In order to properly handle a mutation request, we need: * The validator logic, including all the DB queries used to verify the various conditions * The `RPCProcessor`, including the utility queries used to write various data * The rate limiter helper used by the validator/processor * Some config constants And then I modified it with: * Changed all query patterns from `sqlx` to `pgx` patterns * Passing request context through instead of using background since this is called as part of a network request * Instead of using hefty shared queries in the validator, updated it to use lighter "exists" style queries that just validate a condition we expect instead of returning whole rows of data for us to parse through in code * Removed all code related to forwarding and receiving `rpcLog` messages between servers. However we are still inserting them and checking for duplicates in the cases where users double submit. Note: I did change the 'relayed by' to a static string since it's required in the DB model. Bonus this will help us differentiate between requests handled by bridge and those handled by legacy DN. TODO: _I have not tested any of this yet_. Will spin it up in a local env with some test users and go through all the functionality I can manage. To be handled in a follow-up: - Re-enabling websocket broadcast
1 parent 39c4d51 commit 28808be

File tree

13 files changed

+2222
-0
lines changed

13 files changed

+2222
-0
lines changed

api/comms/chat.go

Lines changed: 454 additions & 0 deletions
Large diffs are not rendered by default.

api/comms/chat_blast.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package comms
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"bridgerton.audius.co/trashid"
8+
"github.com/jackc/pgx/v5"
9+
)
10+
11+
// Result struct to hold chat_id and to_user_id
12+
type ChatBlastResult struct {
13+
ChatID string `db:"chat_id"`
14+
ToUserID int32 `db:"to_user_id"`
15+
}
16+
17+
type OutgoingChatMessage struct {
18+
ChatMessageRPC ChatMessageRPC `json:"chat_message_rpc"`
19+
}
20+
21+
func chatBlast(tx pgx.Tx, ctx context.Context, userId int32, ts time.Time, params ChatBlastRPCParams) ([]OutgoingChatMessage, error) {
22+
var audienceContentID *int
23+
if params.AudienceContentID != nil {
24+
id, _ := trashid.DecodeHashId(*params.AudienceContentID)
25+
audienceContentID = &id
26+
}
27+
28+
// insert params.Message into chat_blast table
29+
_, err := tx.Exec(ctx, `
30+
insert into chat_blast
31+
(blast_id, from_user_id, audience, audience_content_type, audience_content_id, plaintext, created_at)
32+
values
33+
($1, $2, $3, $4, $5, $6, $7)
34+
on conflict (blast_id)
35+
do nothing
36+
`, params.BlastID, userId, params.Audience, params.AudienceContentType, audienceContentID, params.Message, ts)
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
// fan out messages to existing threads
42+
// see also: similar but subtly different inverse query in `getNewBlasts helper in chat.go`
43+
var results []ChatBlastResult
44+
45+
fanOutSql := `
46+
WITH targ AS (
47+
SELECT
48+
blast_id,
49+
from_user_id,
50+
to_user_id,
51+
member_b.chat_id
52+
FROM chat_blast
53+
JOIN chat_blast_audience(chat_blast.blast_id) USING (blast_id)
54+
LEFT JOIN chat_member member_a on from_user_id = member_a.user_id
55+
LEFT JOIN chat_member member_b on to_user_id = member_b.user_id and member_b.chat_id = member_a.chat_id
56+
WHERE blast_id = $1
57+
AND member_b.chat_id IS NOT NULL
58+
AND chat_allowed(from_user_id, to_user_id)
59+
),
60+
insert_message AS (
61+
INSERT INTO chat_message
62+
(message_id, chat_id, user_id, created_at, blast_id)
63+
SELECT
64+
blast_id || targ.chat_id, -- this ordering needs to match Misc.BlastMessageID
65+
targ.chat_id,
66+
targ.from_user_id,
67+
$2,
68+
blast_id
69+
FROM targ
70+
ON conflict do nothing
71+
)
72+
SELECT chat_id FROM targ;
73+
`
74+
75+
rows, err := tx.Query(ctx, fanOutSql, params.BlastID, ts)
76+
if err != nil {
77+
return nil, err
78+
}
79+
defer rows.Close()
80+
81+
// Scan the results into the results slice
82+
results, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (ChatBlastResult, error) {
83+
var result ChatBlastResult
84+
err := row.Scan(&result.ChatID, &result.ToUserID)
85+
return result, err
86+
})
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
// Formulate chat rpc messages for recipients who have an existing chat with sender
92+
var outgoingMessages []OutgoingChatMessage
93+
for _, result := range results {
94+
messageID := trashid.BlastMessageID(params.BlastID, result.ChatID)
95+
96+
isPlaintext := true
97+
outgoingMessages = append(outgoingMessages, OutgoingChatMessage{
98+
ChatMessageRPC: ChatMessageRPC{
99+
Method: MethodChatMessage,
100+
Params: ChatMessageRPCParams{
101+
ChatID: result.ChatID,
102+
Message: params.Message,
103+
MessageID: messageID,
104+
IsPlaintext: &isPlaintext,
105+
Audience: &params.Audience,
106+
}}})
107+
108+
if err := chatUpdateLatestFields(tx, ctx, result.ChatID); err != nil {
109+
return nil, err
110+
}
111+
}
112+
113+
return outgoingMessages, nil
114+
}

api/comms/constants.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package comms
2+
3+
var (
4+
// TODO: verify this is correct
5+
SigHeader = "x-sig"
6+
SignatureTimeToLiveMs = int64(1000 * 60 * 60 * 12) // 12 hours
7+
8+
// Rate limit config
9+
RateLimitRulesBucketName = "rateLimitRules"
10+
RateLimitTimeframeHours = "timeframeHours"
11+
RateLimitMaxNumMessages = "maxNumMessages"
12+
RateLimitMaxNumMessagesPerRecipient = "maxNumMessagesPerRecipient"
13+
RateLimitMaxNumNewChats = "maxNumNewChats"
14+
15+
DefaultRateLimitRules = map[string]int{
16+
RateLimitTimeframeHours: 24,
17+
RateLimitMaxNumMessages: 2000,
18+
RateLimitMaxNumMessagesPerRecipient: 1000,
19+
RateLimitMaxNumNewChats: 100000,
20+
}
21+
)

api/comms/rate_limit.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package comms
2+
3+
import (
4+
"sync"
5+
)
6+
7+
func NewRateLimiter() (*RateLimiter, error) {
8+
9+
limiter := &RateLimiter{
10+
limits: map[string]int{},
11+
}
12+
13+
return limiter, nil
14+
}
15+
16+
type RateLimiter struct {
17+
sync.RWMutex
18+
limits map[string]int
19+
}
20+
21+
func (limiter *RateLimiter) Get(rule string) int {
22+
limiter.RLock()
23+
defer limiter.RUnlock()
24+
25+
if val := limiter.limits[rule]; val != 0 {
26+
return val
27+
}
28+
29+
return DefaultRateLimitRules[rule]
30+
}

api/comms/raw_rpc.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package comms
2+
3+
import (
4+
"encoding/json"
5+
)
6+
7+
// RawRPC matches (quicktype generated) RPC
8+
// Except Params is a json.RawMessage instead of a quicktype approximation of a golang union type which sadly doesn't really exist.
9+
// which is more generic + convienent to use in go code
10+
// it should match the fields of RPC
11+
type RawRPC struct {
12+
CurrentUserID string `json:"current_user_id"`
13+
Method string `json:"method"`
14+
Params json.RawMessage `json:"params"`
15+
Timestamp int64 `json:"timestamp"`
16+
}

api/comms/rpc_log.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package comms
2+
3+
import (
4+
"encoding/json"
5+
"time"
6+
)
7+
8+
// RpcLog was previously used to track messages sent between comms peer servers.
9+
// We are now using it as a record of RPC requests received from clients.
10+
// RelayedAt will be the timestamp when the server received the request.
11+
// RelayedBy will be hard-coded to "bridge" to differentiate it from legacy rpclog messages.
12+
type RpcLog struct {
13+
ID string `db:"id" json:"id"`
14+
RelayedAt time.Time `db:"relayed_at" json:"relayed_at"`
15+
AppliedAt time.Time `db:"applied_at" json:"applied_at"`
16+
RelayedBy string `db:"relayed_by" json:"relayed_by"`
17+
FromWallet string `db:"from_wallet" json:"from_wallet"`
18+
Rpc json.RawMessage `db:"rpc" json:"rpc"`
19+
Sig string `db:"sig" json:"sig"`
20+
}

0 commit comments

Comments
 (0)