Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve some comments #383

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions channels_redis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def acquire(self, channel):

def locked(self, channel):
"""
Return ``True`` if the lock for the given channel is acquired.
Return `True` if the lock for the given channel is acquired.
"""
return self.locks[channel].locked()

Expand Down Expand Up @@ -129,19 +129,19 @@ def __init__(
self.client_prefix = uuid.uuid4().hex
# Set up any encryption objects
self._setup_encryption(symmetric_encryption_keys)
# Number of coroutines trying to receive right now
# Number of coroutines trying to `receive` right now
self.receive_count = 0
# The receive lock
self.receive_lock = None
# Event loop they are trying to receive on
# Event loop they are trying to `receive` on
self.receive_event_loop = None
# Buffered messages by process-local channel name
self.receive_buffer = collections.defaultdict(
functools.partial(BoundedQueue, self.capacity)
)
# Detached channel cleanup tasks
self.receive_cleaners = []
# Per-channel cleanup locks to prevent a receive starting and moving
# Per-channel cleanup locks to prevent a `receive` starting and moving
# a message back into the main queue before its cleanup has completed
self.receive_clean_locks = ChannelLock()

Expand All @@ -166,7 +166,7 @@ def _setup_encryption(self, symmetric_encryption_keys):
else:
self.crypter = None

### Channel layer API ###
# # # Channel layer API # # #

extensions = ["groups", "flush"]

Expand Down Expand Up @@ -274,7 +274,7 @@ async def receive(self, channel):
self.receive_count += 1
try:
if self.receive_count == 1:
# If we're the first coroutine in, create the receive lock!
# If we're the first coroutine in, create the `receive` lock!
self.receive_lock = asyncio.Lock()
self.receive_event_loop = loop
else:
Expand Down Expand Up @@ -329,7 +329,7 @@ async def receive(self, channel):

if message or exception:
if token:
# We will not be receving as we already have the message.
# We will not be receiving as we already have the message.
self.receive_lock.release()

if exception:
Expand All @@ -339,7 +339,7 @@ async def receive(self, channel):
else:
assert token

# We hold the receive lock, receive and then release it.
# We hold the `receive` lock, receive and then release it.
try:
# There is no interruption point from when the message is
# unpacked in receive_single to when we get back here, so
Expand Down Expand Up @@ -370,13 +370,13 @@ async def receive(self, channel):

finally:
self.receive_count -= 1
# If we were the last out, drop the receive lock
# If we were the last out, drop the `receive` lock
if self.receive_count == 0:
assert not self.receive_lock.locked()
self.receive_lock = None
self.receive_event_loop = None
else:
# Do a plain direct receive
# Do a plain direct `receive`
return (await self.receive_single(channel))[1]

async def receive_single(self, channel):
Expand Down Expand Up @@ -441,7 +441,7 @@ async def new_channel(self, prefix="specific"):
"""
return f"{prefix}.{self.client_prefix}!{uuid.uuid4().hex}"

### Flush extension ###
# # # Flush extension # # #

async def flush(self):
"""
Expand Down Expand Up @@ -482,7 +482,7 @@ async def wait_received(self):
if self.receive_cleaners:
await asyncio.wait(self.receive_cleaners[:])

### Groups extension ###
# # # Groups extension # # #

async def group_add(self, group, channel):
"""
Expand Down Expand Up @@ -644,7 +644,7 @@ def _group_key(self, group):
"""
return f"{self.prefix}:group:{group}".encode("utf8")

### Serialization ###
# # # Serialization # # #

def serialize(self, message):
"""
Expand All @@ -654,7 +654,7 @@ def serialize(self, message):
if self.crypter:
value = self.crypter.encrypt(value)

# As we use an sorted set to expire messages we need to guarantee uniqueness, with 12 bytes.
# As we use a sorted set to expire messages we need to guarantee uniqueness, with 12 bytes.
random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big")
return random_prefix + value

Expand All @@ -669,7 +669,7 @@ def deserialize(self, message):
message = self.crypter.decrypt(message, self.expiry + 10)
return msgpack.unpackb(message, raw=False)

### Internal functions ###
# # # Internal functions # # #

def consistent_hash(self, value):
return _consistent_hash(value, self.ring_size)
Expand All @@ -688,7 +688,7 @@ def make_fernet(self, key):
def __str__(self):
return f"{self.__class__.__name__}(hosts={self.hosts})"

### Connection handling ###
# # # Connection handling # # #

def connection(self, index):
"""
Expand Down
6 changes: 3 additions & 3 deletions channels_redis/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ async def receive(self, channel):
try:
message = await q.get()
except (asyncio.CancelledError, asyncio.TimeoutError, GeneratorExit):
# We assume here that the reason we are cancelled is because the consumer
# is exiting, therefore we need to cleanup by unsubscribe below. Indeed,
# We assume here that the reason we are cancelled, is because the consumer
# is exiting, therefore we need to clean-up by unsubscribe below. Indeed,
# currently the way that Django Channels works, this is a safe assumption.
# In the future, Dajngo Channels could change to call a *new* method that
# In the future, Django Channels could change to call a *new* method that
# would serve as the antithesis of `new_channel()`; this new method might
# be named `delete_channel()`. If that were the case, we would do the
# following cleanup from that new `delete_channel()` method, but, since
Expand Down
2 changes: 1 addition & 1 deletion channels_redis/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def create_pool(host):
Takes the value of the "host" argument and returns a suited connection pool to
the corresponding redis instance.
"""
# avoid side-effects from modifying host
# Avoid side-effects from modifying host
host = host.copy()
if "address" in host:
address = host.pop("address")
Expand Down