File tree Expand file tree Collapse file tree 1 file changed +30
-1
lines changed
Expand file tree Collapse file tree 1 file changed +30
-1
lines changed Original file line number Diff line number Diff line change @@ -263,7 +263,36 @@ func (q *PGMQQueue) Stats(tenantId int64, queue string) models.QueueStats {
263263}
264264
265265func (q * PGMQQueue ) Filter (tenantId int64 , queue string , filterCriteria models.FilterCriteria ) []int64 {
266- return nil
266+ var messageIds []int64
267+
268+ tableName := buildTenantQueueTableName (tenantId , queue )
269+
270+ args := make ([]any , 0 )
271+
272+ whereConditions := make ([]string , 0 )
273+
274+ if filterCriteria .MessageID > 0 {
275+ whereConditions = append (whereConditions , "msg_id = ?" )
276+ args = append (args , filterCriteria .MessageID )
277+ }
278+
279+ for k , v := range filterCriteria .KV {
280+ whereConditions = append (whereConditions , "message->'headers'->>? = ?" )
281+ args = append (args , k , v )
282+ }
283+
284+ whereClause := ""
285+ if len (whereConditions ) > 0 {
286+ whereClause = "WHERE " + strings .Join (whereConditions , " AND " )
287+ }
288+
289+ sql := fmt .Sprintf ("SELECT msg_id FROM %s %s LIMIT 10" , tableName , whereClause )
290+ res := q .DB .Raw (sql , args ... ).Scan (& messageIds )
291+ if res .Error != nil {
292+ log .Error ().Err (res .Error ).Msg ("Unable to filter" )
293+ }
294+
295+ return messageIds
267296}
268297
269298func (q * PGMQQueue ) Delete (tenantId int64 , queue string , messageId int64 ) error {
You can’t perform that action at this time.
0 commit comments