Skip to content

Commit 0c83b72

Browse files
Aligned status emmiting (#214)
* Aligned status emmiting * Tests use new category * improve error handling * PubNub SDK 10.3.0 release. --------- Co-authored-by: PubNub Release Bot <[email protected]>
1 parent 0656e6a commit 0c83b72

File tree

14 files changed

+167
-44
lines changed

14 files changed

+167
-44
lines changed

.pubnub.yml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: python
2-
version: 10.2.0
2+
version: 10.3.0
33
schema: 1
44
scm: github.com/pubnub/python
55
sdks:
@@ -18,7 +18,7 @@ sdks:
1818
distributions:
1919
- distribution-type: library
2020
distribution-repository: package
21-
package-name: pubnub-10.2.0
21+
package-name: pubnub-10.3.0
2222
location: https://pypi.org/project/pubnub/
2323
supported-platforms:
2424
supported-operating-systems:
@@ -91,8 +91,8 @@ sdks:
9191
-
9292
distribution-type: library
9393
distribution-repository: git release
94-
package-name: pubnub-10.2.0
95-
location: https://github.com/pubnub/python/releases/download/10.2.0/pubnub-10.2.0.tar.gz
94+
package-name: pubnub-10.3.0
95+
location: https://github.com/pubnub/python/releases/download/10.3.0/pubnub-10.3.0.tar.gz
9696
supported-platforms:
9797
supported-operating-systems:
9898
Linux:
@@ -163,6 +163,11 @@ sdks:
163163
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
164164
is-required: Required
165165
changelog:
166+
- date: 2025-04-10
167+
version: 10.3.0
168+
changes:
169+
- type: feature
170+
text: "Additional status emission during subscription."
166171
- date: 2025-02-07
167172
version: 10.2.0
168173
changes:

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 10.3.0
2+
April 10 2025
3+
4+
#### Added
5+
- Additional status emission during subscription.
6+
17
## 10.2.0
28
February 07 2025
39

pubnub/enums.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class PNStatusCategory(Enum):
3737
PNTLSConnectionFailedCategory = 15
3838
PNTLSUntrustedCertificateCategory = 16
3939
PNInternalExceptionCategory = 17
40+
PNSubscriptionChangedCategory = 18
41+
PNConnectionErrorCategory = 19
4042

4143

4244
class PNOperationType(object):

pubnub/event_engine/effects.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
128128
recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken)
129129
self.event_engine.trigger(recieve_failure)
130130
elif response.status.error:
131+
if self.stop_event.is_set():
132+
self.logger.debug(f'Recieve messages cancelled: {response.status.error_data.__dict__}')
133+
return
131134
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
132135
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
133136
self.event_engine.trigger(recieve_failure)
@@ -437,6 +440,9 @@ def set_pn(self, pubnub: PubNub):
437440
self.message_worker = BaseMessageWorker(pubnub)
438441

439442
def emit(self, invocation: invocations.PNEmittableInvocation):
443+
if isinstance(invocation, list):
444+
for inv in invocation:
445+
self.emit(inv)
440446
if isinstance(invocation, invocations.EmitMessagesInvocation):
441447
self.emit_message(invocation)
442448
if isinstance(invocation, invocations.EmitStatusInvocation):
@@ -449,8 +455,15 @@ def emit_message(self, invocation: invocations.EmitMessagesInvocation):
449455
self.message_worker._process_incoming_payload(subscribe_message)
450456

451457
def emit_status(self, invocation: invocations.EmitStatusInvocation):
458+
if isinstance(invocation.status, PNStatus):
459+
self.pubnub._subscription_manager._listener_manager.announce_status(invocation.status)
460+
return
452461
pn_status = PNStatus()
453462
pn_status.category = invocation.status
454463
pn_status.operation = invocation.operation
464+
if invocation.context and invocation.context.channels:
465+
pn_status.affected_channels = invocation.context.channels
466+
if invocation.context and invocation.context.groups:
467+
pn_status.affected_groups = invocation.context.groups
455468
pn_status.error = False
456469
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)

pubnub/event_engine/models/invocations.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Union
1+
from typing import List, Optional, Union
22
from pubnub.exceptions import PubNubException
33
from pubnub.enums import PNOperationType, PNStatusCategory
44

@@ -90,10 +90,16 @@ def __init__(self, messages: Union[None, List[str]]) -> None:
9090

9191

9292
class EmitStatusInvocation(PNEmittableInvocation):
93-
def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None:
93+
def __init__(
94+
self,
95+
status: Optional[PNStatusCategory],
96+
operation: Optional[PNOperationType] = None,
97+
context=None,
98+
) -> None:
9499
super().__init__()
95100
self.status = status
96101
self.operation = operation
102+
self.context = context
97103

98104

99105
"""

pubnub/event_engine/models/states.py

Lines changed: 100 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pubnub.event_engine.models import events
55
from pubnub.exceptions import PubNubException
66
from typing import List, Union
7+
from pubnub.models.consumer.pn_error_data import PNErrorData
78

89

910
class PNContext(dict):
@@ -122,7 +123,15 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
122123

123124
return PNTransition(
124125
state=HandshakingState,
125-
context=self._context
126+
context=self._context,
127+
invocation=[
128+
invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
129+
operation=PNOperationType.PNSubscribeOperation,
130+
context=self._context),
131+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
132+
operation=PNOperationType.PNSubscribeOperation,
133+
context=self._context),
134+
]
126135
)
127136

128137
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -148,7 +157,7 @@ def reconnecting(self, event: events.HandshakeFailureEvent, context: PNContext)
148157

149158
return PNTransition(
150159
state=HandshakeReconnectingState,
151-
context=self._context
160+
context=self._context,
152161
)
153162

154163
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -183,8 +192,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
183192
return PNTransition(
184193
state=UnsubscribedState,
185194
context=self._context,
186-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
187-
operation=PNOperationType.PNUnsubscribeOperation)
195+
invocation=[
196+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
197+
operation=PNOperationType.PNSubscribeOperation,
198+
context=self._context),
199+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
200+
operation=PNOperationType.PNSubscribeOperation,
201+
context=self._context),
202+
]
188203
)
189204

190205

@@ -218,7 +233,10 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra
218233

219234
return PNTransition(
220235
state=HandshakeStoppedState,
221-
context=self._context
236+
context=self._context,
237+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
238+
operation=PNOperationType.PNSubscribeOperation,
239+
context=self._context)
222240
)
223241

224242
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -230,7 +248,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
230248

231249
return PNTransition(
232250
state=HandshakeReconnectingState,
233-
context=self._context
251+
context=self._context,
252+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
253+
operation=PNOperationType.PNSubscribeOperation,
254+
context=self._context)
234255
)
235256

236257
def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, context: PNContext) -> PNTransition:
@@ -240,7 +261,7 @@ def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, cont
240261

241262
return PNTransition(
242263
state=HandshakeReconnectingState,
243-
context=self._context
264+
context=self._context,
244265
)
245266

246267
def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContext) -> PNTransition:
@@ -252,8 +273,15 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex
252273
if isinstance(event, Exception) and 'status' in event.reason:
253274
status_invocation = invocations.EmitStatusInvocation(status=event.reason.status.category,
254275
operation=PNOperationType.PNUnsubscribeOperation)
276+
elif isinstance(context.reason, PNErrorData):
277+
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory,
278+
context=self._context)
279+
elif isinstance(context.reason, PubNubException):
280+
status = context.reason.status
281+
status.category = PNStatusCategory.PNConnectionErrorCategory
282+
status_invocation = invocations.EmitStatusInvocation(status)
255283
else:
256-
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
284+
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory)
257285

258286
return PNTransition(
259287
state=HandshakeFailedState,
@@ -305,7 +333,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
305333

306334
return PNTransition(
307335
state=HandshakingState,
308-
context=self._context
336+
context=self._context,
337+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
338+
operation=PNOperationType.PNSubscribeOperation,
339+
context=self._context)
309340
)
310341

311342
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -340,8 +371,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
340371
return PNTransition(
341372
state=UnsubscribedState,
342373
context=self._context,
343-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
344-
operation=PNOperationType.PNUnsubscribeOperation)
374+
invocation=[
375+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
376+
operation=PNOperationType.PNSubscribeOperation,
377+
context=self._context),
378+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
379+
operation=PNOperationType.PNSubscribeOperation,
380+
context=self._context),
381+
]
345382
)
346383

347384

@@ -374,8 +411,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
374411
return PNTransition(
375412
state=UnsubscribedState,
376413
context=self._context,
377-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
378-
operation=PNOperationType.PNUnsubscribeOperation)
414+
invocation=[
415+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
416+
operation=PNOperationType.PNSubscribeOperation,
417+
context=self._context),
418+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
419+
operation=PNOperationType.PNSubscribeOperation,
420+
context=self._context),
421+
]
379422
)
380423

381424

@@ -412,7 +455,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
412455

413456
return PNTransition(
414457
state=self.__class__,
415-
context=self._context
458+
context=self._context,
459+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
460+
operation=PNOperationType.PNSubscribeOperation,
461+
context=self._context)
416462
)
417463

418464
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -446,7 +492,7 @@ def receiving_failure(self, event: events.ReceiveFailureEvent, context: PNContex
446492
self._context.timetoken = event.timetoken
447493
return PNTransition(
448494
state=ReceiveReconnectingState,
449-
context=self._context
495+
context=self._context,
450496
)
451497

452498
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -477,8 +523,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
477523
return PNTransition(
478524
state=UnsubscribedState,
479525
context=self._context,
480-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
481-
operation=PNOperationType.PNUnsubscribeOperation)
526+
invocation=[
527+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
528+
operation=PNOperationType.PNSubscribeOperation,
529+
context=self._context),
530+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
531+
operation=PNOperationType.PNSubscribeOperation,
532+
context=self._context),
533+
]
482534
)
483535

484536

@@ -515,7 +567,10 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context:
515567

516568
return PNTransition(
517569
state=ReceiveReconnectingState,
518-
context=self._context
570+
context=self._context,
571+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory,
572+
operation=PNOperationType.PNSubscribeOperation,
573+
context=self._context)
519574
)
520575

521576
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -527,7 +582,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
527582

528583
return PNTransition(
529584
state=ReceiveReconnectingState,
530-
context=self._context
585+
context=self._context,
586+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
587+
operation=PNOperationType.PNSubscribeOperation,
588+
context=self._context)
531589
)
532590

533591
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -546,7 +604,9 @@ def give_up(self, event: events.ReceiveReconnectGiveupEvent, context: PNContext)
546604
return PNTransition(
547605
state=ReceiveFailedState,
548606
context=self._context,
549-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
607+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory,
608+
operation=PNOperationType.PNSubscribeOperation,
609+
context=self._context)
550610
)
551611

552612
def reconnect_success(self, event: events.ReceiveReconnectSuccessEvent, context: PNContext) -> PNTransition:
@@ -602,7 +662,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
602662

603663
return PNTransition(
604664
state=ReceivingState,
605-
context=self._context
665+
context=self._context,
666+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
667+
operation=PNOperationType.PNSubscribeOperation,
668+
context=self._context)
606669
)
607670

608671
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -637,8 +700,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
637700
return PNTransition(
638701
state=UnsubscribedState,
639702
context=self._context,
640-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
641-
operation=PNOperationType.PNUnsubscribeOperation)
703+
invocation=[
704+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
705+
operation=PNOperationType.PNSubscribeOperation,
706+
context=self._context),
707+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
708+
operation=PNOperationType.PNSubscribeOperation,
709+
context=self._context),
710+
]
642711
)
643712

644713

@@ -671,8 +740,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
671740
return PNTransition(
672741
state=UnsubscribedState,
673742
context=self._context,
674-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
675-
operation=PNOperationType.PNUnsubscribeOperation)
743+
invocation=[
744+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
745+
operation=PNOperationType.PNSubscribeOperation,
746+
context=self._context),
747+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
748+
operation=PNOperationType.PNSubscribeOperation,
749+
context=self._context),
750+
]
676751
)
677752

678753

pubnub/pubnub_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def __init__(self):
559559
self.error_queue = Queue()
560560

561561
def status(self, pubnub, status):
562+
super().status(pubnub, status)
562563
if utils.is_subscribed_event(status) and not self.connected_event.is_set():
563564
self.connected_event.set()
564565
elif utils.is_unsubscribed_event(status) and not self.disconnected_event.is_set():

0 commit comments

Comments
 (0)