@@ -2,12 +2,19 @@ package pgmq
22
33import (
44 "context"
5+ "encoding/json"
6+ "fmt"
57 "github.com/craigpastro/pgmq-go"
68 "github.com/poundifdef/smoothmq/config"
79 "github.com/poundifdef/smoothmq/models"
810 "github.com/rs/zerolog/log"
911)
1012
13+ type Envelope struct {
14+ Body string `json:"body"`
15+ Headers map [string ]string `json:"headers"`
16+ }
17+
1118type PGMQQueue struct {
1219 PGMQ * pgmq.PGMQ
1320}
@@ -24,33 +31,94 @@ func NewPGMQQueue(cfg config.PGMQConfig) (*PGMQQueue, error) {
2431 return driver , nil
2532}
2633
34+ func buildTenantQueueName (tenantId int64 , queueName string ) string {
35+ return fmt .Sprintf ("q_%x_%s" , uint64 (tenantId ), queueName )
36+ }
37+
38+ func toMessage (tenantId int64 , in * pgmq.Message ) (* models.Message , error ) {
39+ var envelope Envelope
40+ err := json .Unmarshal (in .Message , & envelope )
41+ if err != nil {
42+ return nil , err
43+ }
44+
45+ return & models.Message {
46+ ID : in .MsgID ,
47+ TenantID : tenantId ,
48+ //QueueID: message.QueueID,
49+ //DeliverAt: int(message.DeliverAt),
50+ //DeliveredAt: int(message.DeliveredAt),
51+ //Tries: message.Tries,
52+ //MaxTries: message.MaxTries,
53+ Message : []byte (envelope .Body ),
54+ KeyValues : envelope .Headers ,
55+ }, nil
56+ }
57+
2758func (q * PGMQQueue ) GetQueue (tenantId int64 , queueName string ) (models.QueueProperties , error ) {
2859 queue := models.QueueProperties {}
2960 return queue , nil
3061}
3162
3263func (q * PGMQQueue ) CreateQueue (tenantId int64 , properties models.QueueProperties ) error {
33- return nil
64+ queueName := buildTenantQueueName (tenantId , properties .Name )
65+ err := q .PGMQ .CreateQueue (context .TODO (), queueName )
66+ return err
3467}
3568
3669func (q * PGMQQueue ) UpdateQueue (tenantId int64 , queue string , properties models.QueueProperties ) error {
3770 return nil
3871}
3972
4073func (q * PGMQQueue ) DeleteQueue (tenantId int64 , queue string ) error {
41- return nil
74+ queueName := buildTenantQueueName (tenantId , queue )
75+ err := q .PGMQ .DropQueue (context .TODO (), queueName )
76+ return err
4277}
4378
4479func (q * PGMQQueue ) ListQueues (tenantId int64 ) ([]string , error ) {
4580 return nil , nil
4681}
4782
4883func (q * PGMQQueue ) Enqueue (tenantId int64 , queue string , message string , kv map [string ]string , delay int ) (int64 , error ) {
49- return 0 , nil
84+ queueName := buildTenantQueueName (tenantId , queue )
85+ envelope := Envelope {
86+ Body : message ,
87+ Headers : kv ,
88+ }
89+ rawMsg , err := json .Marshal (envelope )
90+ if err != nil {
91+ return 0 , err
92+ }
93+ msgId , err := q .PGMQ .Send (context .TODO (), queueName , rawMsg )
94+ return msgId , err
5095}
5196
5297func (q * PGMQQueue ) Dequeue (tenantId int64 , queue string , numToDequeue int , requeueIn int ) ([]* models.Message , error ) {
53- return nil , nil
98+ queueName := buildTenantQueueName (tenantId , queue )
99+ var visibilityTimeoutSeconds int64
100+ visibilityTimeoutSeconds = 0 // Use default
101+
102+ if requeueIn > 0 {
103+ visibilityTimeoutSeconds = int64 (requeueIn )
104+ }
105+
106+ msgs , err := q .PGMQ .ReadBatch (context .TODO (), queueName , visibilityTimeoutSeconds , int64 (numToDequeue ))
107+
108+ if err != nil {
109+ return nil , err
110+ }
111+
112+ out := make ([]* models.Message , len (msgs ))
113+
114+ for i , msg := range msgs {
115+ msg2 , err := toMessage (tenantId , msg )
116+ if err != nil {
117+ return nil , err
118+ }
119+ out [i ] = msg2
120+ }
121+ return out , nil
54122}
55123
56124func (q * PGMQQueue ) Peek (tenantId int64 , queue string , messageId int64 ) * models.Message {
0 commit comments