Description
Description:
We have encountered a memory leak issue when using PubNubAsyncio
in our FastAPI application. The issue seems to be related to the creation and failure to release event loops after they are no longer needed.
Steps to Reproduce:
- Create a
PubNubAsyncio
instance usingasyncio.new_event_loop()
. - Use the
PubNubAsyncio
instance to publish messages. - Stop the
PubNubAsyncio
instance usingawait client.stop()
. - Observe that the created event loop is not released, leading to a memory leak.
Code Example:
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator, List
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio
from sqlalchemy.ext.asyncio import AsyncSession
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.v3.channel import Channel
class Pubnub:
GRANT_TTL = 1440 # minutes
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(PubNubException))
def __build_client(self, *, user_id: str) -> PubNubAsyncio:
config = PNConfiguration()
config.subscribe_key = "your_subscribe_key"
config.publish_key = "your_publish_key"
config.secret_key = "your_secret_key"
config.user_id = user_id
config.ssl = True
return PubNubAsyncio(config)
@asynccontextmanager
async def pubnub_client_context(self, user_id: str) -> AsyncGenerator[PubNubAsyncio, None]:
client = self.__build_client(user_id=user_id)
try:
yield client
finally:
await client.stop()
async def publish_message(self, session: AsyncSession, *, input_: dict):
channel = f"id.{input_['to_user_uuid']}.event"
async with self.pubnub_client_context(user_id=f"server:{os.getenv('HOSTNAME')}") as client:
token = await self.__grant_token(client=client, authorized_channels=[channel])
client.set_token(token=token)
await client.publish(channel=channel, message=input_).future()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(PubNubException))
async def __grant_token(self, *, client: PubNubAsyncio, authorized_channels: List[str]) -> str:
channels = [Channel.id(channel).read().write() for channel in authorized_channels]
envelope = await client.grant_token(channels=channels, ttl=self.GRANT_TTL).future()
return envelope.result.token
Observed Behavior:
After stopping the PubNubAsyncio
instance using await client.stop()
, the event loop created by asyncio.new_event_loop()
is not released, causing a memory leak. Over time, this results in increased memory consumption.
Expected Behavior:
The event loop created by asyncio.new_event_loop()
should be properly released when the PubNubAsyncio
instance is stopped to prevent memory leaks.
Environment:
- Python version: 3.12
- PubNub version: 9.1.0
- FastAPI version: 0.109.2
Additional Information:
We have observed this issue consistently in our production environment, leading to increased memory usage over time. Proper release of the event loop after stopping the PubNubAsyncio
instance would help mitigate this issue.