diff --git a/.github/workflows/pgmq_python.yml b/.github/workflows/pgmq_python.yml
index b24b018..7fbf9e3 100644
--- a/.github/workflows/pgmq_python.yml
+++ b/.github/workflows/pgmq_python.yml
@@ -73,7 +73,6 @@ jobs:
run: uv python install 3.13
- name: Install project
run: uv sync --all-groups
-
- name: build
run: uv build
- name: Publish
diff --git a/src/pgmq/__init__.py b/src/pgmq/__init__.py
index 8a431a7..0b0f82e 100644
--- a/src/pgmq/__init__.py
+++ b/src/pgmq/__init__.py
@@ -1,4 +1,15 @@
+# src/pgmq/__init__.py
+
from pgmq.queue import Message, PGMQueue # type: ignore
from pgmq.decorators import transaction, async_transaction
+from pgmq.logger import PGMQLogger, create_logger, log_performance
-__all__ = ["Message", "PGMQueue", "transaction", "async_transaction"]
+__all__ = [
+ "Message",
+ "PGMQueue",
+ "transaction",
+ "async_transaction",
+ "PGMQLogger",
+ "create_logger",
+ "log_performance",
+]
diff --git a/src/pgmq/async_queue.py b/src/pgmq/async_queue.py
index 3b3e432..a8de44c 100644
--- a/src/pgmq/async_queue.py
+++ b/src/pgmq/async_queue.py
@@ -1,4 +1,4 @@
-# async_queue.py
+# src/pgmq/async_queue.py (fixed sections)
from dataclasses import dataclass, field
from typing import Optional, List
@@ -11,6 +11,7 @@
from pgmq.messages import Message, QueueMetrics
from pgmq.decorators import async_transaction as transaction
+from pgmq.logger import PGMQLogger, create_logger
@dataclass
@@ -28,6 +29,11 @@ class PGMQueue:
perform_transaction: bool = False
verbose: bool = False
log_filename: Optional[str] = None
+ # New logging options
+ structured_logging: bool = False
+ log_rotation: bool = False
+ log_rotation_size: str = "10 MB"
+ log_retention: str = "1 week"
pool: asyncpg.pool.Pool = field(init=False)
logger: logging.Logger = field(init=False)
@@ -46,23 +52,27 @@ def __post_init__(self) -> None:
self.logger.debug("PGMQueue initialized")
def _initialize_logging(self) -> None:
- self.logger = logging.getLogger(__name__)
-
- if self.verbose:
- log_filename = self.log_filename or datetime.now().strftime(
- "pgmq_async_debug_%Y%m%d_%H%M%S.log"
- )
- file_handler = logging.FileHandler(
- filename=os.path.join(os.getcwd(), log_filename)
- )
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ """Initialize logging using the centralized logger module."""
+ # Use create_logger for backward compatibility
+ self.logger = create_logger(
+ name=__name__, verbose=self.verbose, log_filename=self.log_filename
+ )
+
+ # If enhanced features are needed, reconfigure with PGMQLogger
+ if self.structured_logging or self.log_rotation:
+ # Remove existing handlers to avoid duplicates
+ for handler in self.logger.handlers[:]:
+ self.logger.removeHandler(handler)
+
+ # Get enhanced logger
+ self.logger = PGMQLogger.get_logger(
+ name=__name__,
+ verbose=self.verbose,
+ log_filename=self.log_filename,
+ structured=self.structured_logging,
+ rotation=self.log_rotation_size if self.log_rotation else None,
+ retention=self.log_retention if self.log_rotation else None,
)
- file_handler.setFormatter(formatter)
- self.logger.addHandler(file_handler)
- self.logger.setLevel(logging.DEBUG)
- else:
- self.logger.setLevel(logging.WARNING)
async def init(self, init_extension: bool = True):
self.logger.debug("Creating asyncpg connection pool")
@@ -90,10 +100,13 @@ async def create_partitioned_queue(
conn=None,
) -> None:
"""Create a new partitioned queue."""
- self.logger.debug(
- f"create_partitioned_queue called with queue='{queue}', "
- f"partition_interval={partition_interval}, "
- f"retention_interval={retention_interval}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Creating partitioned queue",
+ queue=queue,
+ partition_interval=partition_interval,
+ retention_interval=retention_interval,
)
if conn is None:
async with self.pool.acquire() as conn:
@@ -119,8 +132,8 @@ async def _create_partitioned_queue_internal(
@transaction
async def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> None:
"""Create a new queue."""
- self.logger.debug(
- f"create_queue called with queue='{queue}', unlogged={unlogged}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Creating queue", queue=queue, unlogged=unlogged
)
if conn is None:
async with self.pool.acquire() as conn:
@@ -137,7 +150,9 @@ async def _create_queue_internal(self, queue, unlogged, conn):
async def validate_queue_name(self, queue_name: str) -> None:
"""Validate the length of a queue name."""
- self.logger.debug(f"validate_queue_name called with queue_name='{queue_name}'")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Validating queue name", queue_name=queue_name
+ )
async with self.pool.acquire() as conn:
await conn.execute("SELECT pgmq.validate_queue_name($1);", queue_name)
@@ -146,8 +161,12 @@ async def drop_queue(
self, queue: str, partitioned: bool = False, conn=None
) -> bool:
"""Drop a queue."""
- self.logger.debug(
- f"drop_queue called with queue='{queue}', partitioned={partitioned}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Dropping queue",
+ queue=queue,
+ partitioned=partitioned,
)
if conn is None:
async with self.pool.acquire() as conn:
@@ -165,7 +184,7 @@ async def _drop_queue_internal(self, queue, partitioned, conn):
@transaction
async def list_queues(self, conn=None) -> List[str]:
"""List all queues."""
- self.logger.debug(f"list_queues called with conn={conn}")
+ PGMQLogger.log_with_context(self.logger, logging.DEBUG, "Listing queues")
if conn is None:
async with self.pool.acquire() as conn:
return await self._list_queues_internal(conn)
@@ -183,9 +202,15 @@ async def send(
self, queue: str, message: dict, delay: int = 0, tz: datetime = None, conn=None
) -> int:
"""Send a message to a queue."""
- self.logger.debug(
- f"send called with queue='{queue}', message={message}, delay={delay}, tz={tz}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Sending message",
+ queue=queue,
+ delay=delay,
+ has_tz=tz is not None,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._send_internal(queue, message, delay, tz, conn)
@@ -224,7 +249,15 @@ async def _send_internal(
queue,
dumps(message).decode("utf-8"),
)
- self.logger.debug(f"Message sent with msg_id={result[0]}")
+
+ # Log success with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message sent successfully",
+ queue=queue,
+ msg_id=result[0],
+ )
return result[0]
@transaction
@@ -237,9 +270,16 @@ async def send_batch(
conn=None,
) -> List[int]:
"""Send a batch of messages to a queue."""
- self.logger.debug(
- f"send_batch called with queue='{queue}', messages={messages}, delay={delay}, tz={tz}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Sending batch messages",
+ queue=queue,
+ batch_size=len(messages),
+ delay=delay,
+ has_tz=tz is not None,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._send_batch_internal(queue, messages, delay, tz, conn)
@@ -279,8 +319,17 @@ async def _send_batch_internal(
queue,
jsonb_array,
)
+
msg_ids = [message[0] for message in result]
- self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}")
+ # Log success with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages sent successfully",
+ queue=queue,
+ msg_ids=msg_ids,
+ count=len(msg_ids),
+ )
return msg_ids
@transaction
@@ -288,7 +337,10 @@ async def read(
self, queue: str, vt: Optional[int] = None, conn=None
) -> Optional[Message]:
"""Read a message from a queue."""
- self.logger.debug(f"read called with queue='{queue}', vt={vt}, conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Reading message", queue=queue, vt=vt or self.vt
+ )
+
batch_size = 1
if conn is None:
async with self.pool.acquire() as conn:
@@ -314,17 +366,33 @@ async def _read_internal(self, queue, vt, batch_size, conn):
)
for row in rows
]
- self.logger.debug(f"Message read: {messages[0] if messages else None}")
- return messages[0] if messages else None
+
+ result = messages[0] if messages else None
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message read completed",
+ queue=queue,
+ msg_id=result.msg_id if result else None,
+ has_message=result is not None,
+ )
+ return result
@transaction
async def read_batch(
self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None
) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
- self.logger.debug(
- f"read_batch called with queue='{queue}', vt={vt}, batch_size={batch_size}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Reading batch messages",
+ queue=queue,
+ vt=vt or self.vt,
+ batch_size=batch_size,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._read_batch_internal(queue, vt, batch_size, conn)
@@ -351,7 +419,15 @@ async def _read_batch_internal(self, queue, vt, batch_size, conn):
)
for row in rows
]
- self.logger.debug(f"Batch messages read: {messages}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages read completed",
+ queue=queue,
+ count=len(messages),
+ )
return messages
@transaction
@@ -365,10 +441,17 @@ async def read_with_poll(
conn=None,
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
- self.logger.debug(
- f"read_with_poll called with queue='{queue}', vt={vt}, qty={qty}, "
- f"max_poll_seconds={max_poll_seconds}, poll_interval_ms={poll_interval_ms}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Reading messages with poll",
+ queue=queue,
+ vt=vt or self.vt,
+ qty=qty,
+ max_poll_seconds=max_poll_seconds,
+ poll_interval_ms=poll_interval_ms,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._read_with_poll_internal(
@@ -401,13 +484,24 @@ async def _read_with_poll_internal(
)
for row in rows
]
- self.logger.debug(f"Messages read with polling: {messages}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Messages read with poll completed",
+ queue=queue,
+ count=len(messages),
+ )
return messages
@transaction
async def pop(self, queue: str, conn=None) -> Message:
"""Pop a message from a queue."""
- self.logger.debug(f"pop called with queue='{queue}', conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Popping message", queue=queue
+ )
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._pop_internal(queue, conn)
@@ -427,15 +521,26 @@ async def _pop_internal(self, queue, conn):
)
for row in rows
]
- self.logger.debug(f"Message popped: {messages[0] if messages else None}")
- return messages[0] if messages else None
+
+ result = messages[0] if messages else None
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message popped successfully",
+ queue=queue,
+ msg_id=result.msg_id if result else None,
+ has_message=result is not None,
+ )
+ return result
@transaction
async def delete(self, queue: str, msg_id: int, conn=None) -> bool:
"""Delete a message from a queue."""
- self.logger.debug(
- f"delete called with queue='{queue}', msg_id={msg_id}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Deleting message", queue=queue, msg_id=msg_id
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._delete_internal(queue, msg_id, conn)
@@ -447,7 +552,16 @@ async def _delete_internal(self, queue, msg_id, conn):
row = await conn.fetchrow(
"SELECT pgmq.delete($1::text, $2::int);", queue, msg_id
)
- self.logger.debug(f"Message deleted: {row[0]}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message deleted",
+ queue=queue,
+ msg_id=msg_id,
+ success=row[0],
+ )
return row[0]
@transaction
@@ -455,9 +569,14 @@ async def delete_batch(
self, queue: str, msg_ids: List[int], conn=None
) -> List[int]:
"""Delete multiple messages from a queue."""
- self.logger.debug(
- f"delete_batch called with queue='{queue}', msg_ids={msg_ids}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Deleting batch messages",
+ queue=queue,
+ msg_ids=msg_ids,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._delete_batch_internal(queue, msg_ids, conn)
@@ -471,16 +590,26 @@ async def _delete_batch_internal(self, queue, msg_ids, conn):
results = await conn.fetch(
"SELECT * FROM pgmq.delete($1::text, $2::int[]);", queue, msg_ids
)
+
deleted_ids = [result[0] for result in results]
- self.logger.debug(f"Messages deleted: {deleted_ids}")
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages deleted",
+ queue=queue,
+ deleted_ids=deleted_ids,
+ count=len(deleted_ids),
+ )
return deleted_ids
@transaction
async def archive(self, queue: str, msg_id: int, conn=None) -> bool:
"""Archive a message from a queue."""
- self.logger.debug(
- f"archive called with queue='{queue}', msg_id={msg_id}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Archiving message", queue=queue, msg_id=msg_id
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._archive_internal(queue, msg_id, conn)
@@ -494,7 +623,16 @@ async def _archive_internal(self, queue, msg_id, conn):
row = await conn.fetchrow(
"SELECT pgmq.archive($1::text, $2::int);", queue, msg_id
)
- self.logger.debug(f"Message archived: {row[0]}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message archived",
+ queue=queue,
+ msg_id=msg_id,
+ success=row[0],
+ )
return row[0]
@transaction
@@ -502,9 +640,14 @@ async def archive_batch(
self, queue: str, msg_ids: List[int], conn=None
) -> List[int]:
"""Archive multiple messages from a queue."""
- self.logger.debug(
- f"archive_batch called with queue='{queue}', msg_ids={msg_ids}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Archiving batch messages",
+ queue=queue,
+ msg_ids=msg_ids,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._archive_batch_internal(queue, msg_ids, conn)
@@ -518,14 +661,26 @@ async def _archive_batch_internal(self, queue, msg_ids, conn):
results = await conn.fetch(
"SELECT * FROM pgmq.archive($1::text, $2::int[]);", queue, msg_ids
)
+
archived_ids = [result[0] for result in results]
- self.logger.debug(f"Messages archived: {archived_ids}")
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages archived",
+ queue=queue,
+ archived_ids=archived_ids,
+ count=len(archived_ids),
+ )
return archived_ids
@transaction
async def purge(self, queue: str, conn=None) -> int:
"""Purge a queue."""
- self.logger.debug(f"purge called with queue='{queue}', conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Purging queue", queue=queue
+ )
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._purge_internal(queue, conn)
@@ -535,13 +690,20 @@ async def purge(self, queue: str, conn=None) -> int:
async def _purge_internal(self, queue, conn):
self.logger.debug(f"Purging queue '{queue}'")
row = await conn.fetchrow("SELECT pgmq.purge_queue($1);", queue)
- self.logger.debug(f"Messages purged: {row[0]}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Queue purged", queue=queue, count=row[0]
+ )
return row[0]
@transaction
async def metrics(self, queue: str, conn=None) -> QueueMetrics:
"""Get metrics for a specific queue."""
- self.logger.debug(f"metrics called with queue='{queue}', conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Getting queue metrics", queue=queue
+ )
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._metrics_internal(queue, conn)
@@ -559,13 +721,25 @@ async def _metrics_internal(self, queue, conn):
total_messages=result[4],
scrape_time=result[5],
)
- self.logger.debug(f"Metrics fetched: {metrics}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Queue metrics retrieved",
+ queue=queue,
+ queue_length=metrics.queue_length,
+ total_messages=metrics.total_messages,
+ )
return metrics
@transaction
async def metrics_all(self, conn=None) -> List[QueueMetrics]:
"""Get metrics for all queues."""
- self.logger.debug(f"metrics_all called with conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Getting all queue metrics"
+ )
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._metrics_all_internal(conn)
@@ -586,15 +760,28 @@ async def _metrics_all_internal(self, conn):
)
for row in results
]
- self.logger.debug(f"All metrics fetched: {metrics_list}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "All queue metrics retrieved",
+ count=len(metrics_list),
+ )
return metrics_list
@transaction
async def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
"""Set the visibility timeout for a specific message."""
- self.logger.debug(
- f"set_vt called with queue='{queue}', msg_id={msg_id}, vt={vt}, conn={conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Setting visibility timeout",
+ queue=queue,
+ msg_id=msg_id,
+ vt=vt,
)
+
if conn is None:
async with self.pool.acquire() as conn:
return await self._set_vt_internal(queue, msg_id, vt, conn)
@@ -615,13 +802,25 @@ async def _set_vt_internal(self, queue, msg_id, vt, conn):
vt=row[3],
message=loads(row[4]),
)
- self.logger.debug(f"VT set for message: {message}")
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Visibility timeout set",
+ queue=queue,
+ msg_id=msg_id,
+ new_vt=message.vt,
+ )
return message
@transaction
async def detach_archive(self, queue: str, conn=None) -> None:
"""Detach an archive from a queue."""
- self.logger.debug(f"detach_archive called with queue='{queue}', conn={conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Detaching archive", queue=queue
+ )
+
if conn is None:
async with self.pool.acquire() as conn:
await self._detach_archive_internal(queue, conn)
@@ -631,4 +830,8 @@ async def detach_archive(self, queue: str, conn=None) -> None:
async def _detach_archive_internal(self, queue, conn):
self.logger.debug(f"Detaching archive from queue '{queue}'")
await conn.execute("SELECT pgmq.detach_archive($1);", queue)
- self.logger.debug(f"Archive detached from queue '{queue}'")
+
+ # Log completion with context
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Archive detached", queue=queue
+ )
diff --git a/src/pgmq/logger.py b/src/pgmq/logger.py
new file mode 100644
index 0000000..4e35e86
--- /dev/null
+++ b/src/pgmq/logger.py
@@ -0,0 +1,469 @@
+# src/pgmq/logger.py (fixed)
+
+import logging
+import logging.handlers
+import os
+import sys
+import functools
+import asyncio
+import time
+from datetime import datetime
+from typing import Optional, Dict, Any, Union
+
+# Try to import loguru, fall back to standard logging if not available
+try:
+ from loguru import logger as loguru_logger
+
+ LOGURU_AVAILABLE = True
+except ImportError:
+ LOGURU_AVAILABLE = False
+
+
+class PGMQLogger:
+ """
+ Centralized logger for PGMQueue with enhanced features.
+ Backward compatible with existing PGMQueue implementation.
+ Supports both standard logging and loguru (if installed).
+ """
+
+ _loggers: Dict[str, Union[logging.Logger, Any]] = {}
+ _configured: bool = False
+ _use_loguru: bool = LOGURU_AVAILABLE
+
+ @classmethod
+ def get_logger(
+ cls,
+ name: str,
+ verbose: bool = False,
+ log_filename: Optional[str] = None,
+ log_format: Optional[str] = None,
+ log_level: Optional[Union[int, str]] = None,
+ enable_rotation: bool = False,
+ max_bytes: int = 10 * 1024 * 1024, # 10MB
+ backup_count: int = 5,
+ structured: bool = False,
+ rotation: Optional[str] = None,
+ retention: Optional[str] = None,
+ compression: Optional[str] = None,
+ ) -> Union[logging.Logger, Any]:
+ """
+ Get or create a logger with the specified configuration.
+
+ Args:
+ name: Logger name
+ verbose: Enable debug logging
+ log_filename: Log file path (auto-generated if None and verbose is True)
+ log_format: Custom log format string
+ log_level: Override log level
+ enable_rotation: Enable log rotation (standard logging only)
+ max_bytes: Maximum bytes before rotation (standard logging only)
+ backup_count: Number of backup files to keep (standard logging only)
+ structured: Enable structured JSON logging
+ rotation: Log rotation setting (loguru only, e.g., "10 MB", "1 day")
+ retention: Log retention setting (loguru only, e.g., "1 week")
+ compression: Compression setting (loguru only, e.g., "gz")
+
+ Returns:
+ Configured logger instance (either logging.Logger or loguru logger)
+ """
+ if name in cls._loggers:
+ return cls._loggers[name]
+
+ # Use loguru if available and not explicitly disabled
+ if cls._use_loguru:
+ logger = cls._get_loguru_logger(
+ name=name,
+ verbose=verbose,
+ log_filename=log_filename,
+ log_format=log_format,
+ log_level=log_level,
+ structured=structured,
+ rotation=rotation,
+ retention=retention,
+ compression=compression,
+ )
+ else:
+ logger = cls._get_standard_logger(
+ name=name,
+ verbose=verbose,
+ log_filename=log_filename,
+ log_format=log_format,
+ log_level=log_level,
+ enable_rotation=enable_rotation,
+ max_bytes=max_bytes,
+ backup_count=backup_count,
+ structured=structured,
+ )
+
+ cls._loggers[name] = logger
+ return logger
+
+ @classmethod
+ def _get_standard_logger(
+ cls,
+ name: str,
+ verbose: bool = False,
+ log_filename: Optional[str] = None,
+ log_format: Optional[str] = None,
+ log_level: Optional[int] = None,
+ enable_rotation: bool = False,
+ max_bytes: int = 10 * 1024 * 1024,
+ backup_count: int = 5,
+ structured: bool = False,
+ ) -> logging.Logger:
+ """Get a standard Python logging logger."""
+ logger = logging.getLogger(name)
+
+ # Skip if already configured with handlers
+ if logger.handlers:
+ return logger
+
+ # Set log level
+ if log_level is not None:
+ logger.setLevel(log_level)
+ elif verbose:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.WARNING)
+
+ # Default format
+ if log_format is None:
+ if structured:
+ log_format = '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "message": "%(message)s"}'
+ else:
+ log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+
+ formatter = logging.Formatter(log_format)
+
+ # Console handler
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(formatter)
+ logger.addHandler(console_handler)
+
+ # File handler (if verbose or log_filename provided)
+ if verbose or log_filename:
+ if log_filename is None:
+ log_filename = datetime.now().strftime("pgmq_debug_%Y%m%d_%H%M%S.log")
+
+ log_path = os.path.join(os.getcwd(), log_filename)
+
+ if enable_rotation:
+ file_handler = logging.handlers.RotatingFileHandler(
+ filename=log_path, maxBytes=max_bytes, backupCount=backup_count
+ )
+ else:
+ file_handler = logging.FileHandler(filename=log_path)
+
+ file_handler.setFormatter(formatter)
+ logger.addHandler(file_handler)
+
+ return logger
+
+ @classmethod
+ def _get_loguru_logger(
+ cls,
+ name: str,
+ verbose: bool = False,
+ log_filename: Optional[str] = None,
+ log_format: Optional[str] = None,
+ log_level: Optional[Union[int, str]] = None,
+ structured: bool = False,
+ rotation: Optional[str] = None,
+ retention: Optional[str] = None,
+ compression: Optional[str] = None,
+ ) -> Any:
+ """Get a loguru logger."""
+ # Remove default handler
+ loguru_logger.remove()
+
+ # Set log level
+ if log_level is None:
+ log_level = "DEBUG" if verbose else "WARNING"
+ elif isinstance(log_level, int):
+ # Convert standard logging levels to loguru levels
+ level_map = {
+ logging.DEBUG: "DEBUG",
+ logging.INFO: "INFO",
+ logging.WARNING: "WARNING",
+ logging.ERROR: "ERROR",
+ logging.CRITICAL: "CRITICAL",
+ }
+ log_level = level_map.get(log_level, "INFO")
+
+ # Default format
+ if log_format is None:
+ if structured:
+ log_format = '{{"timestamp": "{time:YYYY-MM-DD HH:mm:ss.SSS}", "level": "{level}", "logger": "{extra[logger]}", "message": "{message}"}}'
+ else:
+ log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {extra[logger]}:{function}:{line} - {message}"
+
+ # Add console handler
+ loguru_logger.add(
+ sys.stderr,
+ format=log_format,
+ level=log_level,
+ enqueue=True,
+ backtrace=True,
+ diagnose=True,
+ )
+
+ # Add file handler if needed
+ if verbose or log_filename:
+ if log_filename is None:
+ log_filename = datetime.now().strftime("pgmq_debug_%Y%m%d_%H%M%S.log")
+
+ log_path = os.path.join(os.getcwd(), log_filename)
+
+ loguru_logger.add(
+ log_path,
+ format=log_format,
+ level=log_level,
+ rotation=rotation or "10 MB",
+ retention=retention or "1 week",
+ compression=compression,
+ enqueue=True,
+ backtrace=True,
+ diagnose=True,
+ )
+
+ # Bind logger name to extra context
+ logger = loguru_logger.bind(logger=name)
+ return logger
+
+ @classmethod
+ def configure_global_logging(
+ cls,
+ log_level: Union[int, str] = logging.INFO,
+ log_format: Optional[str] = None,
+ structured: bool = False,
+ use_loguru: Optional[bool] = None,
+ ) -> None:
+ """
+ Configure global logging settings for all PGMQ loggers.
+
+ Args:
+ log_level: Default log level
+ log_format: Default log format
+ structured: Enable structured JSON logging
+ use_loguru: Force use of loguru (None = auto-detect)
+ """
+ cls._configured = True
+
+ if use_loguru is not None:
+ cls._use_loguru = use_loguru and LOGURU_AVAILABLE
+
+ if cls._use_loguru:
+ # Configure loguru globally
+ loguru_logger.remove()
+
+ if log_level is None:
+ log_level = "INFO"
+ elif isinstance(log_level, int):
+ level_map = {
+ logging.DEBUG: "DEBUG",
+ logging.INFO: "INFO",
+ logging.WARNING: "WARNING",
+ logging.ERROR: "ERROR",
+ logging.CRITICAL: "CRITICAL",
+ }
+ log_level = level_map.get(log_level, "INFO")
+
+ if log_format is None:
+ if structured:
+ log_format = '{{"timestamp": "{time:YYYY-MM-DD HH:mm:ss.SSS}", "level": "{level}", "logger": "{extra[logger]}", "message": "{message}"}}'
+ else:
+ log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {extra[logger]} - {message}"
+
+ loguru_logger.add(
+ sys.stderr, format=log_format, level=log_level, enqueue=True
+ )
+ else:
+ # Configure standard logging globally
+ root_logger = logging.getLogger("pgmq")
+ root_logger.setLevel(log_level)
+
+ if log_format is None:
+ if structured:
+ log_format = '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "message": "%(message)s"}'
+ else:
+ log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+
+ formatter = logging.Formatter(log_format)
+
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(formatter)
+ root_logger.addHandler(console_handler)
+
+ @classmethod
+ def log_with_context(
+ cls,
+ logger: Union[logging.Logger, Any],
+ level: Union[int, str],
+ message: str,
+ **context,
+ ) -> None:
+ """
+ Log a message with additional context.
+
+ Args:
+ logger: Logger instance
+ level: Log level
+ message: Log message
+ **context: Additional context data
+ """
+ if cls._use_loguru:
+ # Use loguru's bind method for context
+ if context:
+ logger = logger.bind(**context)
+
+ # Map standard logging levels to loguru
+ if isinstance(level, int):
+ level_map = {
+ logging.DEBUG: "DEBUG",
+ logging.INFO: "INFO",
+ logging.WARNING: "WARNING",
+ logging.ERROR: "ERROR",
+ logging.CRITICAL: "CRITICAL",
+ }
+ level = level_map.get(level, "INFO")
+
+ logger.log(level, message)
+ else:
+ # Use standard logging
+ if context:
+ context_str = " | ".join([f"{k}={v}" for k, v in context.items()])
+ message = f"{message} | {context_str}"
+
+ logger.log(level, message)
+
+ @classmethod
+ def log_transaction_start(
+ cls, logger: Union[logging.Logger, Any], func_name: str, **context
+ ):
+ """Log the start of a transaction."""
+ cls.log_with_context(
+ logger,
+ logging.DEBUG,
+ f"Transaction started: {func_name}",
+ event="transaction_start",
+ function=func_name,
+ **context,
+ )
+
+ @classmethod
+ def log_transaction_success(
+ cls, logger: Union[logging.Logger, Any], func_name: str, **context
+ ):
+ """Log successful transaction completion."""
+ cls.log_with_context(
+ logger,
+ logging.DEBUG,
+ f"Transaction completed: {func_name}",
+ event="transaction_success",
+ function=func_name,
+ **context,
+ )
+
+ @classmethod
+ def log_transaction_error(
+ cls,
+ logger: Union[logging.Logger, Any],
+ func_name: str,
+ error: Exception,
+ **context,
+ ):
+ """Log transaction error and rollback."""
+ cls.log_with_context(
+ logger,
+ logging.ERROR,
+ f"Transaction failed: {func_name} - {str(error)}",
+ event="transaction_error",
+ function=func_name,
+ error_type=type(error).__name__,
+ error_message=str(error),
+ **context,
+ )
+
+
+# Backward compatibility function
+def create_logger(
+ name: str, verbose: bool = False, log_filename: Optional[str] = None
+) -> Union[logging.Logger, Any]:
+ """
+ Create a logger with backward-compatible interface.
+
+ Args:
+ name: Logger name
+ verbose: Enable debug logging
+ log_filename: Log file path
+
+ Returns:
+ Configured logger instance
+ """
+ return PGMQLogger.get_logger(name=name, verbose=verbose, log_filename=log_filename)
+
+
+# Performance decorator for logging
+def log_performance(logger: Union[logging.Logger, Any]):
+ """Decorator to log function performance."""
+
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ start_time = time.time()
+ try:
+ result = func(*args, **kwargs)
+ elapsed = (time.time() - start_time) * 1000
+ PGMQLogger.log_with_context(
+ logger,
+ logging.DEBUG,
+ f"Completed {func.__name__}",
+ function=func.__name__,
+ elapsed_ms=round(elapsed, 2),
+ success=True,
+ )
+ return result
+ except Exception as e:
+ elapsed = (time.time() - start_time) * 1000
+ PGMQLogger.log_with_context(
+ logger,
+ logging.ERROR,
+ f"Failed {func.__name__}: {str(e)}",
+ function=func.__name__,
+ elapsed_ms=round(elapsed, 2),
+ success=False,
+ error=str(e),
+ )
+ raise
+
+ @functools.wraps(func)
+ async def async_wrapper(*args, **kwargs):
+ start_time = time.time()
+ try:
+ result = await func(*args, **kwargs)
+ elapsed = (time.time() - start_time) * 1000
+ PGMQLogger.log_with_context(
+ logger,
+ logging.DEBUG,
+ f"Completed {func.__name__}",
+ function=func.__name__,
+ elapsed_ms=round(elapsed, 2),
+ success=True,
+ )
+ return result
+ except Exception as e:
+ elapsed = (time.time() - start_time) * 1000
+ PGMQLogger.log_with_context(
+ logger,
+ logging.ERROR,
+ f"Failed {func.__name__}: {str(e)}",
+ function=func.__name__,
+ elapsed_ms=round(elapsed, 2),
+ success=False,
+ error=str(e),
+ )
+ raise
+
+ return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
+
+ return decorator
diff --git a/src/pgmq/queue.py b/src/pgmq/queue.py
index 0e04370..e1d09be 100644
--- a/src/pgmq/queue.py
+++ b/src/pgmq/queue.py
@@ -1,3 +1,5 @@
+# src/pgmq/queue.py (fixed sections)
+
from dataclasses import dataclass, field
from typing import Optional, List, Union
from psycopg.types.json import Jsonb
@@ -5,6 +7,7 @@
import os
from pgmq.messages import Message, QueueMetrics
from pgmq.decorators import transaction
+from pgmq.logger import PGMQLogger, create_logger
import logging
import datetime
@@ -25,6 +28,11 @@ class PGMQueue:
verbose: bool = False
log_filename: Optional[str] = None
init_extension: bool = True
+ # New logging options
+ structured_logging: bool = False
+ log_rotation: bool = False
+ log_rotation_size: str = "10 MB"
+ log_retention: str = "1 week"
pool: ConnectionPool = field(init=False)
logger: logging.Logger = field(init=False)
@@ -43,23 +51,27 @@ def __post_init__(self) -> None:
self._initialize_extensions()
def _initialize_logging(self) -> None:
- self.logger = logging.getLogger(__name__)
+ """Initialize logging using the centralized logger module."""
+ # Use create_logger for backward compatibility
+ self.logger = create_logger(
+ name=__name__, verbose=self.verbose, log_filename=self.log_filename
+ )
- if self.verbose:
- log_filename = self.log_filename or datetime.datetime.now().strftime(
- "pgmq_debug_%Y%m%d_%H%M%S.log"
- )
- file_handler = logging.FileHandler(
- filename=os.path.join(os.getcwd(), log_filename)
- )
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ # If enhanced features are needed, reconfigure with PGMQLogger
+ if self.structured_logging or self.log_rotation:
+ # Remove existing handlers to avoid duplicates
+ for handler in self.logger.handlers[:]:
+ self.logger.removeHandler(handler)
+
+ # Get enhanced logger
+ self.logger = PGMQLogger.get_logger(
+ name=__name__,
+ verbose=self.verbose,
+ log_filename=self.log_filename,
+ structured=self.structured_logging,
+ rotation=self.log_rotation_size if self.log_rotation else None,
+ retention=self.log_retention if self.log_rotation else None,
)
- file_handler.setFormatter(formatter)
- self.logger.addHandler(file_handler)
- self.logger.setLevel(logging.DEBUG)
- else:
- self.logger.setLevel(logging.WARNING)
def _initialize_extensions(self, conn=None) -> None:
self._execute_query("create extension if not exists pgmq cascade;", conn=conn)
@@ -67,8 +79,13 @@ def _initialize_extensions(self, conn=None) -> None:
def _execute_query(
self, query: str, params: Optional[Union[List, tuple]] = None, conn=None
) -> None:
- self.logger.debug(
- f"Executing query: {query} with params: {params} using conn: {conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Executing query",
+ query=query,
+ params=params,
+ conn_id=id(conn) if conn else None,
)
if conn:
conn.execute(query, params)
@@ -79,8 +96,13 @@ def _execute_query(
def _execute_query_with_result(
self, query: str, params: Optional[Union[List, tuple]] = None, conn=None
):
- self.logger.debug(
- f"Executing query with result: {query} with params: {params} using conn: {conn}"
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Executing query with result",
+ query=query,
+ params=params,
+ conn_id=id(conn) if conn else None,
)
if conn:
return conn.execute(query, params).fetchall()
@@ -97,6 +119,14 @@ def create_partitioned_queue(
conn=None,
) -> None:
"""Create a new queue"""
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Creating partitioned queue",
+ queue=queue,
+ partition_interval=partition_interval,
+ retention_interval=retention_interval,
+ )
query = "select pgmq.create(%s, %s::text, %s::text);"
params = [queue, partition_interval, retention_interval]
self._execute_query(query, params, conn=conn)
@@ -104,7 +134,9 @@ def create_partitioned_queue(
@transaction
def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> None:
"""Create a new queue."""
- self.logger.debug(f"create_queue called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Creating queue", queue=queue, unlogged=unlogged
+ )
query = (
"select pgmq.create_unlogged(%s);"
if unlogged
@@ -114,13 +146,22 @@ def create_queue(self, queue: str, unlogged: bool = False, conn=None) -> None:
def validate_queue_name(self, queue_name: str, conn=None) -> None:
"""Validate the length of a queue name."""
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Validating queue name", queue_name=queue_name
+ )
query = "select pgmq.validate_queue_name(%s);"
self._execute_query(query, [queue_name], conn=conn)
@transaction
def drop_queue(self, queue: str, partitioned: bool = False, conn=None) -> bool:
"""Drop a queue."""
- self.logger.debug(f"drop_queue called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Dropping queue",
+ queue=queue,
+ partitioned=partitioned,
+ )
query = "select pgmq.drop_queue(%s, %s);"
result = self._execute_query_with_result(query, [queue, partitioned], conn=conn)
return result[0][0]
@@ -128,7 +169,7 @@ def drop_queue(self, queue: str, partitioned: bool = False, conn=None) -> bool:
@transaction
def list_queues(self, conn=None) -> List[str]:
"""List all queues."""
- self.logger.debug(f"list_queues called with conn: {conn}")
+ PGMQLogger.log_with_context(self.logger, logging.DEBUG, "Listing queues")
query = "select queue_name from pgmq.list_queues();"
rows = self._execute_query_with_result(query, conn=conn)
return [row[0] for row in rows]
@@ -138,7 +179,15 @@ def send(
self, queue: str, message: dict, delay: int = 0, tz: datetime = None, conn=None
) -> int:
"""Send a message to a queue."""
- self.logger.debug(f"send called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Sending message",
+ queue=queue,
+ delay=delay,
+ has_tz=tz is not None,
+ )
+
result = None
if delay:
query = "select * from pgmq.send(%s::text, %s::jsonb, %s::integer);"
@@ -155,6 +204,15 @@ def send(
result = self._execute_query_with_result(
query, [queue, Jsonb(message)], conn=conn
)
+
+ # Log success with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message sent successfully",
+ queue=queue,
+ msg_id=result[0][0],
+ )
return result[0][0]
@transaction
@@ -167,7 +225,16 @@ def send_batch(
conn=None,
) -> List[int]:
"""Send a batch of messages to a queue."""
- self.logger.debug(f"send_batch called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Sending batch messages",
+ queue=queue,
+ batch_size=len(messages),
+ delay=delay,
+ has_tz=tz is not None,
+ )
+
result = None
if delay:
query = "select * from pgmq.send_batch(%s::text, %s::jsonb[], %s::integer);"
@@ -183,14 +250,28 @@ def send_batch(
query = "select * from pgmq.send_batch(%s::text, %s::jsonb[]);"
params = [queue, [Jsonb(message) for message in messages]]
result = self._execute_query_with_result(query, params, conn=conn)
- return [message[0] for message in result]
+
+ msg_ids = [message[0] for message in result]
+ # Log success with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages sent successfully",
+ queue=queue,
+ msg_ids=msg_ids,
+ count=len(msg_ids),
+ )
+ return msg_ids
@transaction
def read(
self, queue: str, vt: Optional[int] = None, conn=None
) -> Optional[Message]:
"""Read a message from a queue."""
- self.logger.debug(f"read called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Reading message", queue=queue, vt=vt or self.vt
+ )
+
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(
query, [queue, vt or self.vt, 1], conn=conn
@@ -199,23 +280,52 @@ def read(
Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4])
for x in rows
]
- return messages[0] if messages else None
+
+ result = messages[0] if messages else None
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message read completed",
+ queue=queue,
+ msg_id=result.msg_id if result else None,
+ has_message=result is not None,
+ )
+ return result
@transaction
def read_batch(
self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None
) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
- self.logger.debug(f"read_batch called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Reading batch messages",
+ queue=queue,
+ vt=vt or self.vt,
+ batch_size=batch_size,
+ )
+
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(
query, [queue, vt or self.vt, batch_size], conn=conn
)
- return [
+ messages = [
Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4])
for x in rows
]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages read completed",
+ queue=queue,
+ count=len(messages),
+ )
+ return messages
+
@transaction
def read_with_poll(
self,
@@ -227,74 +337,180 @@ def read_with_poll(
conn=None,
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
- self.logger.debug(f"read_with_poll called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Reading messages with poll",
+ queue=queue,
+ vt=vt or self.vt,
+ qty=qty,
+ max_poll_seconds=max_poll_seconds,
+ poll_interval_ms=poll_interval_ms,
+ )
+
query = "select * from pgmq.read_with_poll(%s::text, %s::integer, %s::integer, %s::integer, %s::integer);"
params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms]
rows = self._execute_query_with_result(query, params, conn=conn)
- return [
+ messages = [
Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4])
for x in rows
]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Messages read with poll completed",
+ queue=queue,
+ count=len(messages),
+ )
+ return messages
+
@transaction
def pop(self, queue: str, conn=None) -> Message:
"""Pop a message from a queue."""
- self.logger.debug(f"pop called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Popping message", queue=queue
+ )
+
query = "select * from pgmq.pop(%s);"
rows = self._execute_query_with_result(query, [queue], conn=conn)
messages = [
Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4])
for x in rows
]
- return messages[0]
+
+ result = messages[0]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message popped successfully",
+ queue=queue,
+ msg_id=result.msg_id,
+ )
+ return result
@transaction
def delete(self, queue: str, msg_id: int, conn=None) -> bool:
"""Delete a message from a queue."""
- self.logger.debug(f"delete called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Deleting message", queue=queue, msg_id=msg_id
+ )
+
query = "select pgmq.delete(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id], conn=conn)
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message deleted",
+ queue=queue,
+ msg_id=msg_id,
+ success=result[0][0],
+ )
return result[0][0]
@transaction
def delete_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]:
"""Delete multiple messages from a queue."""
- self.logger.debug(f"delete_batch called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Deleting batch messages",
+ queue=queue,
+ msg_ids=msg_ids,
+ )
+
query = "select * from pgmq.delete(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_ids], conn=conn)
- return [x[0] for x in result]
+
+ deleted_ids = [x[0] for x in result]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages deleted",
+ queue=queue,
+ deleted_ids=deleted_ids,
+ count=len(deleted_ids),
+ )
+ return deleted_ids
@transaction
def archive(self, queue: str, msg_id: int, conn=None) -> bool:
"""Archive a message from a queue."""
- self.logger.debug(f"archive called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Archiving message", queue=queue, msg_id=msg_id
+ )
+
query = "select pgmq.archive(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id], conn=conn)
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Message archived",
+ queue=queue,
+ msg_id=msg_id,
+ success=result[0][0],
+ )
return result[0][0]
@transaction
def archive_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]:
"""Archive multiple messages from a queue."""
- self.logger.debug(f"archive_batch called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Archiving batch messages",
+ queue=queue,
+ msg_ids=msg_ids,
+ )
+
query = "select * from pgmq.archive(%s, %s);"
result = self._execute_query_with_result(query, [queue, msg_ids], conn=conn)
- return [x[0] for x in result]
+
+ archived_ids = [x[0] for x in result]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Batch messages archived",
+ queue=queue,
+ archived_ids=archived_ids,
+ count=len(archived_ids),
+ )
+ return archived_ids
@transaction
def purge(self, queue: str, conn=None) -> int:
"""Purge a queue."""
- self.logger.debug(f"purge called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Purging queue", queue=queue
+ )
+
query = "select pgmq.purge_queue(%s);"
result = self._execute_query_with_result(query, [queue], conn=conn)
+
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Queue purged", queue=queue, count=result[0][0]
+ )
return result[0][0]
@transaction
def metrics(self, queue: str, conn=None) -> QueueMetrics:
"""Get metrics for a specific queue."""
- self.logger.debug(f"metrics called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Getting queue metrics", queue=queue
+ )
+
query = "SELECT * FROM pgmq.metrics(%s);"
result = self._execute_query_with_result(query, [queue], conn=conn)[0]
- return QueueMetrics(
+ metrics = QueueMetrics(
queue_name=result[0],
queue_length=result[1],
newest_msg_age_sec=result[2],
@@ -303,13 +519,27 @@ def metrics(self, queue: str, conn=None) -> QueueMetrics:
scrape_time=result[5],
)
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Queue metrics retrieved",
+ queue=queue,
+ queue_length=metrics.queue_length,
+ total_messages=metrics.total_messages,
+ )
+ return metrics
+
@transaction
def metrics_all(self, conn=None) -> List[QueueMetrics]:
"""Get metrics for all queues."""
- self.logger.debug(f"metrics_all called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Getting all queue metrics"
+ )
+
query = "SELECT * FROM pgmq.metrics_all();"
results = self._execute_query_with_result(query, conn=conn)
- return [
+ metrics_list = [
QueueMetrics(
queue_name=row[0],
queue_length=row[1],
@@ -321,15 +551,32 @@ def metrics_all(self, conn=None) -> List[QueueMetrics]:
for row in results
]
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "All queue metrics retrieved",
+ count=len(metrics_list),
+ )
+ return metrics_list
+
@transaction
def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
"""Set the visibility timeout for a specific message."""
- self.logger.debug(f"set_vt called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Setting visibility timeout",
+ queue=queue,
+ msg_id=msg_id,
+ vt=vt,
+ )
+
query = "select * from pgmq.set_vt(%s, %s, %s);"
result = self._execute_query_with_result(query, [queue, msg_id, vt], conn=conn)[
0
]
- return Message(
+ message = Message(
msg_id=result[0],
read_ct=result[1],
enqueued_at=result[2],
@@ -337,9 +584,28 @@ def set_vt(self, queue: str, msg_id: int, vt: int, conn=None) -> Message:
message=result[4],
)
+ # Log result with context
+ PGMQLogger.log_with_context(
+ self.logger,
+ logging.DEBUG,
+ "Visibility timeout set",
+ queue=queue,
+ msg_id=msg_id,
+ new_vt=message.vt,
+ )
+ return message
+
@transaction
def detach_archive(self, queue: str, conn=None) -> None:
"""Detach an archive from a queue."""
- self.logger.debug(f"detach_archive called with conn: {conn}")
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Detaching archive", queue=queue
+ )
+
query = "select pgmq.detach_archive(%s);"
self._execute_query(query, [queue], conn=conn)
+
+ # Log completion with context
+ PGMQLogger.log_with_context(
+ self.logger, logging.DEBUG, "Archive detached", queue=queue
+ )