Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pgmq"
version = "1.0.1"
version = "1.0.2"
description = "Python client for the PGMQ Postgres extension."
readme = "README.md"
license = "Apache-2.0"
Expand Down
61 changes: 37 additions & 24 deletions src/pgmq/async_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def _create_partitioned_queue_internal(
):
self.logger.debug(f"Creating partitioned queue '{queue}'")
await conn.execute(
"SELECT pgmq.create($1, $2::text, $3::text);",
"SELECT pgmq.create_partitioned(queue_name=>$1, partition_interval=>$2::text, retention_interval=>$3::text);",
queue,
partition_interval,
retention_interval,
Expand All @@ -131,15 +131,17 @@ async def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> N
async def _create_queue_internal(self, queue, unlogged, conn):
self.logger.debug(f"Creating queue '{queue}' with unlogged={unlogged}")
if unlogged:
await conn.execute("SELECT pgmq.create_unlogged($1);", queue)
await conn.execute("SELECT pgmq.create_unlogged(queue_name=>$1);", queue)
else:
await conn.execute("SELECT pgmq.create($1);", queue)
await conn.execute("SELECT pgmq.create(queue_name=>$1);", queue)

async def validate_queue_name(self, queue_name: str) -> None:
"""Validate the length of a queue name."""
self.logger.debug(f"validate_queue_name called with queue_name='{queue_name}'")
async with self.pool.acquire() as conn:
await conn.execute("SELECT pgmq.validate_queue_name($1);", queue_name)
await conn.execute(
"SELECT pgmq.validate_queue_name(queue_name=>$1);", queue_name
)

@transaction
async def drop_queue(
Expand All @@ -157,7 +159,9 @@ async def drop_queue(

async def _drop_queue_internal(self, queue, partitioned, conn):
result = await conn.fetchrow(
"SELECT pgmq.drop_queue($1, $2);", queue, partitioned
"SELECT pgmq.drop_queue(queue_name=>$1, partitioned=>$2);",
queue,
partitioned,
)
self.logger.debug(f"Queue '{queue}' dropped: {result[0]}")
return result[0]
Expand Down Expand Up @@ -206,21 +210,21 @@ async def _send_internal(
result = None
if delay:
result = await conn.fetchrow(
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer);",
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb, delay=>$3::integer);",
queue,
dumps(message).decode("utf-8"),
delay,
)
elif tz:
result = await conn.fetchrow(
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::timestamptz);",
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb, delay=>$3::timestamptz);",
queue,
dumps(message).decode("utf-8"),
tz,
)
else:
result = await conn.fetchrow(
"SELECT * FROM pgmq.send($1::text, $2::jsonb);",
"SELECT * FROM pgmq.send(queue_name=>$1::text, msg=>$2::jsonb);",
queue,
dumps(message).decode("utf-8"),
)
Expand Down Expand Up @@ -261,21 +265,21 @@ async def _send_batch_internal(
result = None
if delay:
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3::integer);",
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[], delay=>$3::integer);",
queue,
jsonb_array,
delay,
)
elif tz:
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3::integer);",
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[], delay=>$3::timestamptz);",
queue,
jsonb_array,
tz,
)
else:
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[]);",
"SELECT * FROM pgmq.send_batch(queue_name=>$1, msgs=>$2::jsonb[]);",
queue,
jsonb_array,
)
Expand All @@ -299,7 +303,7 @@ async def read(
async def _read_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
"SELECT * FROM pgmq.read(queue_name=>$1::text, vt=>$2::integer, qty=>$3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down Expand Up @@ -336,7 +340,7 @@ async def _read_batch_internal(self, queue, vt, batch_size, conn):
f"Reading batch of messages from queue '{queue}' with vt={vt}"
)
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
"SELECT * FROM pgmq.read(queue_name=>$1::text, vt=>$2::integer, qty=>$3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down Expand Up @@ -384,7 +388,7 @@ async def _read_with_poll_internal(
):
self.logger.debug(f"Reading messages with polling from queue '{queue}'")
rows = await conn.fetch(
"SELECT * FROM pgmq.read_with_poll($1, $2, $3, $4, $5);",
"SELECT * FROM pgmq.read_with_poll(queue_name=>$1, vt=>$2, qty=>$3, max_poll_seconds=>$4, poll_interval_ms=>$5);",
queue,
vt or self.vt,
qty,
Expand Down Expand Up @@ -416,7 +420,7 @@ async def pop(self, queue: str, conn=None) -> Message:

async def _pop_internal(self, queue, conn):
self.logger.debug(f"Popping message from queue '{queue}'")
rows = await conn.fetch("SELECT * FROM pgmq.pop($1);", queue)
rows = await conn.fetch("SELECT * FROM pgmq.pop(queue_name=>$1);", queue)
messages = [
Message(
msg_id=row[0],
Expand Down Expand Up @@ -445,7 +449,7 @@ async def delete(self, queue: str, msg_id: int, conn=None) -> bool:
async def _delete_internal(self, queue, msg_id, conn):
self.logger.debug(f"Deleting message with msg_id={msg_id} from queue '{queue}'")
row = await conn.fetchrow(
"SELECT pgmq.delete($1::text, $2::int);", queue, msg_id
"SELECT pgmq.delete(queue_name=>$1::text, msg_id=>$2::int);", queue, msg_id
)
self.logger.debug(f"Message deleted: {row[0]}")
return row[0]
Expand All @@ -469,7 +473,9 @@ async def _delete_batch_internal(self, queue, msg_ids, conn):
f"Deleting messages with msg_ids={msg_ids} from queue '{queue}'"
)
results = await conn.fetch(
"SELECT * FROM pgmq.delete($1::text, $2::int[]);", queue, msg_ids
"SELECT * FROM pgmq.delete(queue_name=>$1::text, msg_ids=>$2::int[]);",
queue,
msg_ids,
)
deleted_ids = [result[0] for result in results]
self.logger.debug(f"Messages deleted: {deleted_ids}")
Expand All @@ -492,7 +498,7 @@ async def _archive_internal(self, queue, msg_id, conn):
f"Archiving message with msg_id={msg_id} from queue '{queue}'"
)
row = await conn.fetchrow(
"SELECT pgmq.archive($1::text, $2::int);", queue, msg_id
"SELECT pgmq.archive(queue_name=>$1::text, msg_id=>$2::int);", queue, msg_id
)
self.logger.debug(f"Message archived: {row[0]}")
return row[0]
Expand All @@ -516,7 +522,9 @@ async def _archive_batch_internal(self, queue, msg_ids, conn):
f"Archiving messages with msg_ids={msg_ids} from queue '{queue}'"
)
results = await conn.fetch(
"SELECT * FROM pgmq.archive($1::text, $2::int[]);", queue, msg_ids
"SELECT * FROM pgmq.archive(queue_name=>$1::text, msg_ids=>$2::int[]);",
queue,
msg_ids,
)
archived_ids = [result[0] for result in results]
self.logger.debug(f"Messages archived: {archived_ids}")
Expand All @@ -534,7 +542,7 @@ async def purge(self, queue: str, conn=None) -> int:

async def _purge_internal(self, queue, conn):
self.logger.debug(f"Purging queue '{queue}'")
row = await conn.fetchrow("SELECT pgmq.purge_queue($1);", queue)
row = await conn.fetchrow("SELECT pgmq.purge_queue(queue_name=>$1);", queue)
self.logger.debug(f"Messages purged: {row[0]}")
return row[0]

Expand All @@ -550,7 +558,9 @@ async def metrics(self, queue: str, conn=None) -> QueueMetrics:

async def _metrics_internal(self, queue, conn):
self.logger.debug(f"Fetching metrics for queue '{queue}'")
result = await conn.fetchrow("SELECT * FROM pgmq.metrics($1);", queue)
result = await conn.fetchrow(
"SELECT * FROM pgmq.metrics(queue_name=>$1);", queue
)
metrics = QueueMetrics(
queue_name=result[0],
queue_length=result[1],
Expand Down Expand Up @@ -601,12 +611,15 @@ async def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
else:
return await self._set_vt_internal(queue, msg_id, vt, conn)

async def _set_vt_internal(self, queue, msg_id, vt, conn):
async def _set_vt_internal(self, queue: str, msg_id, vt, conn):
self.logger.debug(
f"Setting VT for msg_id={msg_id} in queue '{queue}' to vt={vt}"
)
row = await conn.fetchrow(
"SELECT * FROM pgmq.set_vt($1, $2, $3);", queue, msg_id, vt
"SELECT * FROM pgmq.set_vt(queue_name=>$1, msg_id=>$2, vt=>$3);",
queue,
msg_id,
vt,
)
message = Message(
msg_id=row[0],
Expand All @@ -630,5 +643,5 @@ async def detach_archive(self, queue: str, conn=None) -> None:

async def _detach_archive_internal(self, queue, conn):
self.logger.debug(f"Detaching archive from queue '{queue}'")
await conn.execute("SELECT pgmq.detach_archive($1);", queue)
await conn.execute("SELECT pgmq.detach_archive(queue_name=>$1);", queue)
self.logger.debug(f"Archive detached from queue '{queue}'")
Loading