Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
454 changes: 454 additions & 0 deletions api/comms/chat.go

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions api/comms/chat_blast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package comms

import (
"context"
"time"

"bridgerton.audius.co/trashid"
"github.com/jackc/pgx/v5"
)

// Result struct to hold chat_id and to_user_id
type ChatBlastResult struct {
ChatID string `db:"chat_id"`
ToUserID int32 `db:"to_user_id"`
}

type OutgoingChatMessage struct {
ChatMessageRPC ChatMessageRPC `json:"chat_message_rpc"`
}

func chatBlast(tx pgx.Tx, ctx context.Context, userId int32, ts time.Time, params ChatBlastRPCParams) ([]OutgoingChatMessage, error) {
var audienceContentID *int
if params.AudienceContentID != nil {
id, _ := trashid.DecodeHashId(*params.AudienceContentID)
audienceContentID = &id
}

// insert params.Message into chat_blast table
_, err := tx.Exec(ctx, `
insert into chat_blast
(blast_id, from_user_id, audience, audience_content_type, audience_content_id, plaintext, created_at)
values
($1, $2, $3, $4, $5, $6, $7)
on conflict (blast_id)
do nothing
`, params.BlastID, userId, params.Audience, params.AudienceContentType, audienceContentID, params.Message, ts)
if err != nil {
return nil, err
}

// fan out messages to existing threads
// see also: similar but subtly different inverse query in `getNewBlasts helper in chat.go`
var results []ChatBlastResult

fanOutSql := `
WITH targ AS (
SELECT
blast_id,
from_user_id,
to_user_id,
member_b.chat_id
FROM chat_blast
JOIN chat_blast_audience(chat_blast.blast_id) USING (blast_id)
LEFT JOIN chat_member member_a on from_user_id = member_a.user_id
LEFT JOIN chat_member member_b on to_user_id = member_b.user_id and member_b.chat_id = member_a.chat_id
WHERE blast_id = $1
AND member_b.chat_id IS NOT NULL
AND chat_allowed(from_user_id, to_user_id)
),
insert_message AS (
INSERT INTO chat_message
(message_id, chat_id, user_id, created_at, blast_id)
SELECT
blast_id || targ.chat_id, -- this ordering needs to match Misc.BlastMessageID
targ.chat_id,
targ.from_user_id,
$2,
blast_id
FROM targ
ON conflict do nothing
)
SELECT chat_id FROM targ;
`

rows, err := tx.Query(ctx, fanOutSql, params.BlastID, ts)
if err != nil {
return nil, err
}
defer rows.Close()

// Scan the results into the results slice
results, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (ChatBlastResult, error) {
var result ChatBlastResult
err := row.Scan(&result.ChatID, &result.ToUserID)
return result, err
})
if err != nil {
return nil, err
}

// Formulate chat rpc messages for recipients who have an existing chat with sender
var outgoingMessages []OutgoingChatMessage
for _, result := range results {
messageID := trashid.BlastMessageID(params.BlastID, result.ChatID)

isPlaintext := true
outgoingMessages = append(outgoingMessages, OutgoingChatMessage{
ChatMessageRPC: ChatMessageRPC{
Method: MethodChatMessage,
Params: ChatMessageRPCParams{
ChatID: result.ChatID,
Message: params.Message,
MessageID: messageID,
IsPlaintext: &isPlaintext,
Audience: &params.Audience,
}}})

if err := chatUpdateLatestFields(tx, ctx, result.ChatID); err != nil {
return nil, err
}
}

return outgoingMessages, nil
}
21 changes: 21 additions & 0 deletions api/comms/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package comms

var (
// TODO: verify this is correct
SigHeader = "x-sig"
SignatureTimeToLiveMs = int64(1000 * 60 * 60 * 12) // 12 hours

// Rate limit config
RateLimitRulesBucketName = "rateLimitRules"
RateLimitTimeframeHours = "timeframeHours"
RateLimitMaxNumMessages = "maxNumMessages"
RateLimitMaxNumMessagesPerRecipient = "maxNumMessagesPerRecipient"
RateLimitMaxNumNewChats = "maxNumNewChats"

DefaultRateLimitRules = map[string]int{
RateLimitTimeframeHours: 24,
RateLimitMaxNumMessages: 2000,
RateLimitMaxNumMessagesPerRecipient: 1000,
RateLimitMaxNumNewChats: 100000,
}
)
30 changes: 30 additions & 0 deletions api/comms/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package comms

import (
"sync"
)

func NewRateLimiter() (*RateLimiter, error) {

limiter := &RateLimiter{
limits: map[string]int{},
}

return limiter, nil
}

type RateLimiter struct {
sync.RWMutex
limits map[string]int
}

func (limiter *RateLimiter) Get(rule string) int {
limiter.RLock()
defer limiter.RUnlock()

if val := limiter.limits[rule]; val != 0 {
return val
}

return DefaultRateLimitRules[rule]
}
16 changes: 16 additions & 0 deletions api/comms/raw_rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package comms

import (
"encoding/json"
)

// RawRPC matches (quicktype generated) RPC
// Except Params is a json.RawMessage instead of a quicktype approximation of a golang union type which sadly doesn't really exist.
// which is more generic + convienent to use in go code
// it should match the fields of RPC
type RawRPC struct {
CurrentUserID string `json:"current_user_id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
Timestamp int64 `json:"timestamp"`
}
20 changes: 20 additions & 0 deletions api/comms/rpc_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package comms

import (
"encoding/json"
"time"
)

// RpcLog was previously used to track messages sent between comms peer servers.
// We are now using it as a record of RPC requests received from clients.
// RelayedAt will be the timestamp when the server received the request.
// RelayedBy will be hard-coded to "bridge" to differentiate it from legacy rpclog messages.
type RpcLog struct {
ID string `db:"id" json:"id"`
RelayedAt time.Time `db:"relayed_at" json:"relayed_at"`
AppliedAt time.Time `db:"applied_at" json:"applied_at"`
RelayedBy string `db:"relayed_by" json:"relayed_by"`
FromWallet string `db:"from_wallet" json:"from_wallet"`
Rpc json.RawMessage `db:"rpc" json:"rpc"`
Sig string `db:"sig" json:"sig"`
}
Loading
Loading