Skip to content

Commit 3483fb6

Browse files
parfeonXavrax
andauthored
Subscribe EE works with presence EE (#170)
Co-authored-by: Xavrax <[email protected]>
1 parent 4906de2 commit 3483fb6

File tree

16 files changed

+448
-79
lines changed

16 files changed

+448
-79
lines changed

examples/subscribe.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
3232

3333
let subscription = client
3434
.subscribe()
35-
.channels(
36-
[
37-
"my_channel".into(),
38-
"other_channel".into(),
39-
"channel-test-history".into(),
40-
]
41-
.to_vec(),
42-
)
35+
.channels(["my_channel".into(), "other_channel".into()].to_vec())
4336
.heartbeat(10)
4437
.filter_expression("some_filter")
4538
.execute()?;

src/core/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ pub enum PubNubError {
118118
details: String,
119119
},
120120

121-
///this error is returned when REST API request can't be handled by service.
121+
///this error is returned when REST API request can't be handled by
122+
/// service.
122123
#[snafu(display("REST API error: {message}"))]
123124
API {
124125
/// Operation status (HTTP) code.

src/dx/presence/builders/heartbeat.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ pub struct HeartbeatRequest<T, D> {
9797
/// ```
9898
#[builder(
9999
field(vis = "pub(in crate::dx::presence)"),
100-
setter(custom, strip_option)
100+
setter(custom, strip_option),
101+
default = "None"
101102
)]
102103
pub(in crate::dx::presence) state: Option<Vec<u8>>,
103104

src/dx/presence/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl<T, D> PubNubClientInstance<T, D> {
101101
/// `user_id` on channels.
102102
///
103103
/// Instance of [`LeaveRequestBuilder`] returned.
104-
pub(in crate::dx::presence) fn leave(&self) -> LeaveRequestBuilder<T, D> {
104+
pub(crate) fn leave(&self) -> LeaveRequestBuilder<T, D> {
105105
LeaveRequestBuilder {
106106
pubnub_client: Some(self.clone()),
107107
user_id: Some(self.config.user_id.clone().to_string()),
@@ -358,7 +358,7 @@ where
358358
/// Prepare presence event engine instance which will be used for `user_id`
359359
/// presence announcement and management.
360360
#[cfg(feature = "std")]
361-
pub(crate) fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
361+
fn presence_event_engine(&self) -> Arc<PresenceEventEngine> {
362362
let channel_bound = 3;
363363
let (cancel_tx, cancel_rx) = async_channel::bounded::<String>(channel_bound);
364364
let delayed_heartbeat_cancel_rx = cancel_rx.clone();

src/dx/presence/presence_manager.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! presence / heartbeat module components.
55
66
use crate::{
7-
dx::presence::event_engine::{PresenceEvent, PresenceEventEngine},
7+
dx::presence::event_engine::PresenceEventEngine,
88
lib::{
99
alloc::sync::Arc,
1010
core::{
@@ -88,29 +88,41 @@ pub(crate) struct PresenceManagerRef {
8888

8989
impl PresenceManagerRef {
9090
/// Announce `join` for `user_id` on provided channels and groups.
91-
#[allow(dead_code)]
9291
pub(crate) fn announce_join(
9392
&self,
94-
channels: Option<Vec<String>>,
95-
channel_groups: Option<Vec<String>>,
93+
_channels: Option<Vec<String>>,
94+
_channel_groups: Option<Vec<String>>,
9695
) {
97-
self.event_engine.process(&PresenceEvent::Joined {
98-
channels,
99-
channel_groups,
100-
})
96+
// TODO: Uncomment after contract test server fix.
97+
// self.event_engine.process(&PresenceEvent::Joined {
98+
// channels,
99+
// channel_groups,
100+
// })
101101
}
102102

103103
/// Announce `leave` for `user_id` on provided channels and groups.
104-
#[allow(dead_code)]
105104
pub(crate) fn announce_left(
106105
&self,
107-
channels: Option<Vec<String>>,
108-
channel_groups: Option<Vec<String>>,
106+
_channels: Option<Vec<String>>,
107+
_channel_groups: Option<Vec<String>>,
109108
) {
110-
self.event_engine.process(&PresenceEvent::Left {
111-
channels,
112-
channel_groups,
113-
})
109+
// TODO: Uncomment after contract test server fix.
110+
// self.event_engine.process(&PresenceEvent::Left {
111+
// channels,
112+
// channel_groups,
113+
// })
114+
}
115+
116+
/// Announce `leave` while client disconnected.
117+
pub(crate) fn disconnect(&self) {
118+
// TODO: Uncomment after contract test server fix.
119+
// self.event_engine.process(&PresenceEvent::Disconnect);
120+
}
121+
122+
/// Announce `join` upon client connection.
123+
pub(crate) fn reconnect(&self) {
124+
// TODO: Uncomment after contract test server fix.
125+
// self.event_engine.process(&PresenceEvent::Reconnect);
114126
}
115127
}
116128

src/dx/pubnub_client.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ pub type PubNubGenericClient<T, D> = PubNubClientInstance<PubNubMiddleware<T>, D
137137
/// You must provide a valid [`Keyset`] with pub/sub keys and a string User ID
138138
/// to identify the client.
139139
///
140-
/// To see available methods, please refer to the [`PubNubClientInstance`] documentation.
140+
/// To see available methods, please refer to the [`PubNubClientInstance`]
141+
/// documentation.
141142
///
142143
/// # Examples
143144
/// ```
@@ -216,7 +217,8 @@ pub type PubNubClient = PubNubGenericClient<TransportReqwest, DeserializerSerde>
216217
/// PubNub client raw instance.
217218
///
218219
/// This struct contains the actual client state.
219-
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or [`PubNubClient`] instead.
220+
/// It shouldn't be used directly. Use [`PubNubGenericClient`] or
221+
/// [`PubNubClient`] instead.
220222
#[derive(Debug)]
221223
pub struct PubNubClientInstance<T, D> {
222224
pub(crate) inner: Arc<PubNubClientRef<T, D>>,
@@ -592,7 +594,8 @@ pub struct PubNubClientBuilder;
592594
impl PubNubClientBuilder {
593595
/// Set the transport layer for the client.
594596
///
595-
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
597+
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
598+
/// `features` following can be set:
596599
/// * runtime environment
597600
/// * API ket set to access [`PubNub API`].
598601
///
@@ -641,7 +644,8 @@ impl PubNubClientBuilder {
641644

642645
/// Set the transport layer for the client.
643646
///
644-
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
647+
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
648+
/// `features` following can be set:
645649
/// * [`PubNub API`] response deserializer
646650
/// * API ket set to access [`PubNub API`].
647651
///
@@ -690,7 +694,8 @@ impl PubNubClientBuilder {
690694

691695
/// Set the blocking transport layer for the client.
692696
///
693-
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled `features` following can be set:
697+
/// Returns [`PubNubClientRuntimeBuilder`] where depending from enabled
698+
/// `features` following can be set:
694699
/// * runtime environment
695700
/// * API ket set to access [`PubNub API`].
696701
///
@@ -742,7 +747,8 @@ impl PubNubClientBuilder {
742747

743748
/// Set the blocking transport layer for the client.
744749
///
745-
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
750+
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
751+
/// `features` following can be set:
746752
/// * [`PubNub API`] response deserializer
747753
/// * API ket set to access [`PubNub API`].
748754
///
@@ -795,8 +801,8 @@ impl PubNubClientBuilder {
795801

796802
/// PubNub builder for [`PubNubClient`] to set API keys.
797803
///
798-
/// The builder provides methods to set the [`PubNub API`] keys set and returns the next
799-
/// step of the builder with the remaining parameters.
804+
/// The builder provides methods to set the [`PubNub API`] keys set and returns
805+
/// the next step of the builder with the remaining parameters.
800806
///
801807
/// See [`PubNubClient`] for more information.
802808
///
@@ -862,7 +868,8 @@ impl<T, D> PubNubClientKeySetBuilder<T, D> {
862868
/// Runtime will be used for detached tasks spawning and delayed task execution.
863869
///
864870
/// Depending from enabled `features` methods may return:
865-
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`] deserializer
871+
/// * [`PubNubClientDeserializerBuilder`] to set custom [`PubNub API`]
872+
/// deserializer
866873
/// * [`PubNubClientKeySetBuilder`] to set API keys set to access [`PubNub API`]
867874
/// * [`PubNubClientUserIdBuilder`] to set user id for the client.
868875
///
@@ -877,7 +884,8 @@ pub struct PubNubClientRuntimeBuilder<T> {
877884
impl<T> PubNubClientRuntimeBuilder<T> {
878885
/// Set runtime environment.
879886
///
880-
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled `features` following can be set:
887+
/// Returns [`PubNubClientDeserializerBuilder`] where depending from enabled
888+
/// `features` following can be set:
881889
/// * [`PubNub API`] response deserializer
882890
/// * API ket set to access [`PubNub API`].
883891
///
@@ -1244,7 +1252,6 @@ where
12441252
/// secret_key: Some("sec-c-abc123"),
12451253
/// };
12461254
/// ```
1247-
///
12481255
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
12491256
pub struct Keyset<S>
12501257
where

src/dx/subscribe/builders/raw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
///
3535
/// It should not be created directly, but via [`PubNubClient::subscribe`]
3636
/// and wrapped in [`Subscription`] struct.
37-
#[derive(Debug, Builder)]
37+
#[derive(Builder)]
3838
#[builder(
3939
pattern = "owned",
4040
name = "RawSubscriptionBuilder",

src/dx/subscribe/builders/subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{core::event_engine::cancel::CancellationTask, lib::alloc::sync::Arc}
4444
/// from the [`PubNub`] network.
4545
///
4646
/// [`PubNub`]:https://www.pubnub.com/
47-
#[derive(Debug, Builder)]
47+
#[derive(Builder)]
4848
#[builder(
4949
pattern = "owned",
5050
build_fn(vis = "pub(in crate::dx::subscribe)", validate = "Self::validate"),

src/dx/subscribe/builders/subscription.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ pub struct SubscriptionStreamRef<D> {
9090
///
9191
/// Handler used each time when new data available for a stream listener.
9292
waker: RwLock<Option<Waker>>,
93+
94+
/// Whether stream still valid or not.
95+
is_valid: bool,
9396
}
9497

9598
/// Subscription that is responsible for getting messages from PubNub.
@@ -505,6 +508,27 @@ impl Subscription {
505508
let subscription = &update.subscription();
506509
self.input.contains_channel(subscription) || self.input.contains_channel_group(subscription)
507510
}
511+
512+
/// Invalidate all streams.
513+
pub(crate) fn invalidate(&mut self) {
514+
let mut stream_slot = self.stream.write();
515+
if let Some(mut stream) = stream_slot.clone() {
516+
stream.invalidate()
517+
}
518+
*stream_slot = None;
519+
520+
let mut stream_slot = self.status_stream.write();
521+
if let Some(mut stream) = stream_slot.clone() {
522+
stream.invalidate()
523+
}
524+
*stream_slot = None;
525+
526+
let mut stream_slot = self.updates_stream.write();
527+
if let Some(mut stream) = stream_slot.clone() {
528+
stream.invalidate()
529+
}
530+
*stream_slot = None;
531+
}
508532
}
509533

510534
impl<D> SubscriptionStream<D> {
@@ -516,10 +540,16 @@ impl<D> SubscriptionStream<D> {
516540
inner: Arc::new(SubscriptionStreamRef {
517541
updates: RwLock::new(stream_updates),
518542
waker: RwLock::new(None),
543+
is_valid: true,
519544
}),
520545
}
521546
}
522547

548+
pub(crate) fn invalidate(&mut self) {
549+
self.is_valid = false;
550+
self.wake_task();
551+
}
552+
523553
fn wake_task(&self) {
524554
if let Some(waker) = self.waker.write().take() {
525555
waker.wake();
@@ -531,6 +561,10 @@ impl<D> Stream for SubscriptionStream<D> {
531561
type Item = D;
532562

533563
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
564+
if !self.is_valid {
565+
return Poll::Ready(None);
566+
}
567+
534568
let mut waker_slot = self.waker.write();
535569
*waker_slot = Some(cx.waker().clone());
536570

src/dx/subscribe/event_engine/effects/handshake.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ pub(super) async fn execute(
1919
input.channel_groups().unwrap_or(Vec::new())
2020
);
2121

22+
if input.is_empty {
23+
return vec![SubscribeEvent::UnsubscribeAll];
24+
}
25+
2226
executor(SubscriptionParams {
2327
channels: &input.channels(),
2428
channel_groups: &input.channel_groups(),

0 commit comments

Comments
 (0)