Skip to content

Aligned status emmiting #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: python
version: 10.2.0
version: 10.3.0
schema: 1
scm: github.com/pubnub/python
sdks:
Expand All @@ -18,7 +18,7 @@ sdks:
distributions:
- distribution-type: library
distribution-repository: package
package-name: pubnub-10.2.0
package-name: pubnub-10.3.0
location: https://pypi.org/project/pubnub/
supported-platforms:
supported-operating-systems:
Expand Down Expand Up @@ -91,8 +91,8 @@ sdks:
-
distribution-type: library
distribution-repository: git release
package-name: pubnub-10.2.0
location: https://github.com/pubnub/python/releases/download/10.2.0/pubnub-10.2.0.tar.gz
package-name: pubnub-10.3.0
location: https://github.com/pubnub/python/releases/download/10.3.0/pubnub-10.3.0.tar.gz
supported-platforms:
supported-operating-systems:
Linux:
Expand Down Expand Up @@ -163,6 +163,11 @@ sdks:
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
is-required: Required
changelog:
- date: 2025-04-10
version: 10.3.0
changes:
- type: feature
text: "Additional status emission during subscription."
- date: 2025-02-07
version: 10.2.0
changes:
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 10.3.0
April 10 2025

#### Added
- Additional status emission during subscription.

## 10.2.0
February 07 2025

Expand Down
2 changes: 2 additions & 0 deletions pubnub/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class PNStatusCategory(Enum):
PNTLSConnectionFailedCategory = 15
PNTLSUntrustedCertificateCategory = 16
PNInternalExceptionCategory = 17
PNSubscriptionChangedCategory = 18
PNConnectionErrorCategory = 19


class PNOperationType(object):
Expand Down
13 changes: 13 additions & 0 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken)
self.event_engine.trigger(recieve_failure)
elif response.status.error:
if self.stop_event.is_set():
self.logger.debug(f'Recieve messages cancelled: {response.status.error_data.__dict__}')
return
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
self.event_engine.trigger(recieve_failure)
Expand Down Expand Up @@ -437,6 +440,9 @@ def set_pn(self, pubnub: PubNub):
self.message_worker = BaseMessageWorker(pubnub)

def emit(self, invocation: invocations.PNEmittableInvocation):
if isinstance(invocation, list):
for inv in invocation:
self.emit(inv)
if isinstance(invocation, invocations.EmitMessagesInvocation):
self.emit_message(invocation)
if isinstance(invocation, invocations.EmitStatusInvocation):
Expand All @@ -449,8 +455,15 @@ def emit_message(self, invocation: invocations.EmitMessagesInvocation):
self.message_worker._process_incoming_payload(subscribe_message)

def emit_status(self, invocation: invocations.EmitStatusInvocation):
if isinstance(invocation.status, PNStatus):
self.pubnub._subscription_manager._listener_manager.announce_status(invocation.status)
return
pn_status = PNStatus()
pn_status.category = invocation.status
pn_status.operation = invocation.operation
if invocation.context and invocation.context.channels:
pn_status.affected_channels = invocation.context.channels
if invocation.context and invocation.context.groups:
pn_status.affected_groups = invocation.context.groups
pn_status.error = False
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)
10 changes: 8 additions & 2 deletions pubnub/event_engine/models/invocations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Union
from typing import List, Optional, Union
from pubnub.exceptions import PubNubException
from pubnub.enums import PNOperationType, PNStatusCategory

Expand Down Expand Up @@ -90,10 +90,16 @@ def __init__(self, messages: Union[None, List[str]]) -> None:


class EmitStatusInvocation(PNEmittableInvocation):
def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None:
def __init__(
self,
status: Optional[PNStatusCategory],
operation: Optional[PNOperationType] = None,
context=None,
) -> None:
super().__init__()
self.status = status
self.operation = operation
self.context = context


"""
Expand Down
125 changes: 100 additions & 25 deletions pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pubnub.event_engine.models import events
from pubnub.exceptions import PubNubException
from typing import List, Union
from pubnub.models.consumer.pn_error_data import PNErrorData


class PNContext(dict):
Expand Down Expand Up @@ -122,7 +123,15 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=HandshakingState,
context=self._context
context=self._context,
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)

def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
Expand All @@ -148,7 +157,7 @@ def reconnecting(self, event: events.HandshakeFailureEvent, context: PNContext)

return PNTransition(
state=HandshakeReconnectingState,
context=self._context
context=self._context,
)

def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -183,8 +192,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down Expand Up @@ -218,7 +233,10 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra

return PNTransition(
state=HandshakeStoppedState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
Expand All @@ -230,7 +248,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=HandshakeReconnectingState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, context: PNContext) -> PNTransition:
Expand All @@ -240,7 +261,7 @@ def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, cont

return PNTransition(
state=HandshakeReconnectingState,
context=self._context
context=self._context,
)

def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContext) -> PNTransition:
Expand All @@ -252,8 +273,15 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex
if isinstance(event, Exception) and 'status' in event.reason:
status_invocation = invocations.EmitStatusInvocation(status=event.reason.status.category,
operation=PNOperationType.PNUnsubscribeOperation)
elif isinstance(context.reason, PNErrorData):
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory,
context=self._context)
elif isinstance(context.reason, PubNubException):
status = context.reason.status
status.category = PNStatusCategory.PNConnectionErrorCategory
status_invocation = invocations.EmitStatusInvocation(status)
else:
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory)

return PNTransition(
state=HandshakeFailedState,
Expand Down Expand Up @@ -305,7 +333,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=HandshakingState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -340,8 +371,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down Expand Up @@ -374,8 +411,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down Expand Up @@ -412,7 +455,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=self.__class__,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -446,7 +492,7 @@ def receiving_failure(self, event: events.ReceiveFailureEvent, context: PNContex
self._context.timetoken = event.timetoken
return PNTransition(
state=ReceiveReconnectingState,
context=self._context
context=self._context,
)

def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -477,8 +523,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down Expand Up @@ -515,7 +567,10 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context:

return PNTransition(
state=ReceiveReconnectingState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
Expand All @@ -527,7 +582,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=ReceiveReconnectingState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
Expand All @@ -546,7 +604,9 @@ def give_up(self, event: events.ReceiveReconnectGiveupEvent, context: PNContext)
return PNTransition(
state=ReceiveFailedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def reconnect_success(self, event: events.ReceiveReconnectSuccessEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -602,7 +662,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:

return PNTransition(
state=ReceivingState,
context=self._context
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context)
)

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -637,8 +700,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down Expand Up @@ -671,8 +740,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
invocation=[
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNSubscribeOperation,
context=self._context),
]
)


Expand Down
1 change: 1 addition & 0 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ def __init__(self):
self.error_queue = Queue()

def status(self, pubnub, status):
super().status(pubnub, status)
if utils.is_subscribed_event(status) and not self.connected_event.is_set():
self.connected_event.set()
elif utils.is_unsubscribed_event(status) and not self.disconnected_event.is_set():
Expand Down
Loading
Loading