Skip to content

Commit 143fe24

Browse files
committed
refactor: replace unbounded_channel with new_dialog_state_channel
1 parent 62db964 commit 143fe24

File tree

7 files changed

+81
-74
lines changed

7 files changed

+81
-74
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsipstack"
3-
version = "0.2.83"
3+
version = "0.2.84"
44
edition = "2021"
55
description = "SIP Stack Rust library for building SIP applications"
66
license = "MIT"

examples/client/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use rsipstack::{
1616
};
1717
use std::net::IpAddr;
1818
use std::{env, sync::Arc, time::Duration};
19-
use tokio::sync::mpsc::unbounded_channel;
2019
use tokio::time::timeout;
2120
use tokio::{select, time::sleep};
2221
use tokio_util::sync::CancellationToken;
@@ -212,7 +211,7 @@ async fn main() -> rsipstack::Result<()> {
212211
let incoming = endpoint.incoming_transactions()?;
213212
let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
214213

215-
let (state_sender, state_receiver) = unbounded_channel();
214+
let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
216215

217216
let first_addr = endpoint
218217
.get_addrs()

src/bin/bench_ua.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::{
2323
},
2424
time::{Duration, Instant},
2525
};
26-
use tokio::sync::mpsc::unbounded_channel;
2726
use tokio::{select, time::sleep};
2827
use tokio_util::sync::CancellationToken;
2928
use tracing::{debug, info};
@@ -381,7 +380,7 @@ async fn main() -> Result<()> {
381380

382381
let incoming = endpoint.incoming_transactions()?;
383382
let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
384-
let (state_sender, state_receiver) = unbounded_channel();
383+
let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
385384
let stats = Stats::new();
386385

387386
let mode_handler: BoxFuture<Result<()>> = match args.mode.as_str() {

src/dialog/dialog.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use super::{
55
DialogId,
66
};
77
use crate::{
8+
dialog::dialog_layer::DialogLayerInnerRef,
89
rsip_ext::extract_uri_from_contact,
910
transaction::{
1011
endpoint::EndpointInnerRef,
@@ -184,12 +185,75 @@ pub struct DialogInner {
184185
pub(super) initial_request: Request,
185186
}
186187

187-
pub type DialogStateReceiver = UnboundedReceiver<DialogState>;
188+
pub struct DialogStateReceiver {
189+
pub(super) dialog_layer_inner: DialogLayerInnerRef,
190+
pub(super) receiver: UnboundedReceiver<DialogState>,
191+
pub(super) dialog_id: Option<DialogId>,
192+
}
193+
194+
impl DialogStateReceiver {
195+
pub async fn recv(&mut self) -> Option<DialogState> {
196+
let state = self.receiver.recv().await;
197+
if let Some(ref s) = state {
198+
if let Some(id) = &self.dialog_id {
199+
if id != s.id() {
200+
match self.dialog_layer_inner.dialogs.write().as_mut() {
201+
Ok(dialogs) => {
202+
dialogs.remove(id);
203+
}
204+
Err(_) => {}
205+
}
206+
}
207+
}
208+
self.dialog_id = Some(s.id().clone());
209+
}
210+
state
211+
}
212+
}
213+
214+
impl Drop for DialogStateReceiver {
215+
fn drop(&mut self) {
216+
let id = match self.dialog_id.take() {
217+
Some(id) => id,
218+
None => return,
219+
};
220+
221+
match self.dialog_layer_inner.dialogs.write().as_mut() {
222+
Ok(dialogs) => {
223+
if let Some(dialog) = dialogs.remove(&id) {
224+
info!(%id, "dialog removed on state receiver drop");
225+
tokio::spawn(async move {
226+
if let Err(e) = dialog.hangup().await {
227+
warn!(%id, "error hanging up dialog on drop: {}", e);
228+
}
229+
});
230+
}
231+
}
232+
Err(_) => {}
233+
}
234+
}
235+
}
236+
188237
pub type DialogStateSender = UnboundedSender<DialogState>;
189238

190239
pub(super) type DialogInnerRef = Arc<DialogInner>;
191240

192241
impl DialogState {
242+
pub fn id(&self) -> &DialogId {
243+
match self {
244+
DialogState::Calling(id)
245+
| DialogState::Trying(id)
246+
| DialogState::Early(id, _)
247+
| DialogState::WaitAck(id, _)
248+
| DialogState::Confirmed(id, _)
249+
| DialogState::Updated(id, _)
250+
| DialogState::Notify(id, _)
251+
| DialogState::Info(id, _)
252+
| DialogState::Options(id, _)
253+
| DialogState::Terminated(id, _) => id,
254+
}
255+
}
256+
193257
pub fn can_cancel(&self) -> bool {
194258
matches!(
195259
self,

src/dialog/dialog_layer.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::authenticate::Credential;
22
use super::dialog::DialogStateSender;
33
use super::{dialog::Dialog, server_dialog::ServerInviteDialog, DialogId};
4-
use crate::dialog::dialog::DialogInner;
4+
use crate::dialog::dialog::{DialogInner, DialogStateReceiver};
55
use crate::transaction::key::TransactionRole;
66
use crate::transaction::make_tag;
77
use crate::transaction::{endpoint::EndpointInnerRef, transaction::Transaction};
@@ -233,4 +233,14 @@ impl DialogLayer {
233233
let id = DialogId::try_from(req).ok()?;
234234
self.get_dialog(&id)
235235
}
236+
237+
pub fn new_dialog_state_channel(&self) -> (DialogStateSender, DialogStateReceiver) {
238+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
239+
let receiver = DialogStateReceiver {
240+
dialog_layer_inner: self.inner.clone(),
241+
receiver: rx,
242+
dialog_id: None,
243+
};
244+
(tx, receiver)
245+
}
236246
}

src/dialog/invitation.rs

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::{
55
dialog_layer::DialogLayer,
66
};
77
use crate::{
8-
dialog::{dialog::Dialog, dialog_layer::DialogLayerInnerRef, DialogId},
8+
dialog::{dialog::Dialog, DialogId},
99
transaction::{
1010
key::{TransactionKey, TransactionRole},
1111
make_tag,
@@ -19,7 +19,7 @@ use rsip::{
1919
Request, Response,
2020
};
2121
use std::sync::Arc;
22-
use tracing::{debug, info, warn};
22+
use tracing::{debug, info};
2323

2424
/// INVITE Request Options
2525
///
@@ -134,64 +134,6 @@ pub struct InviteOption {
134134
pub headers: Option<Vec<rsip::Header>>,
135135
}
136136

137-
pub struct DialogGuard {
138-
pub dialog_layer_inner: DialogLayerInnerRef,
139-
pub id: DialogId,
140-
}
141-
142-
impl DialogGuard {
143-
pub fn new(dialog_layer: &Arc<DialogLayer>, id: DialogId) -> Self {
144-
Self {
145-
dialog_layer_inner: dialog_layer.inner.clone(),
146-
id,
147-
}
148-
}
149-
}
150-
151-
impl Drop for DialogGuard {
152-
fn drop(&mut self) {
153-
let dlg = match self.dialog_layer_inner.dialogs.write() {
154-
Ok(mut dialogs) => match dialogs.remove(&self.id) {
155-
Some(dlg) => dlg,
156-
None => return,
157-
},
158-
_ => return,
159-
};
160-
let _ = tokio::spawn(async move {
161-
if let Err(e) = dlg.hangup().await {
162-
info!(id=%dlg.id(), "failed to hangup dialog: {}", e);
163-
}
164-
});
165-
}
166-
}
167-
168-
pub(super) struct DialogGuardForUnconfirmed<'a> {
169-
pub dialog_layer_inner: &'a DialogLayerInnerRef,
170-
pub id: &'a DialogId,
171-
}
172-
173-
impl<'a> Drop for DialogGuardForUnconfirmed<'a> {
174-
fn drop(&mut self) {
175-
// If the dialog is still unconfirmed, we should try to cancel it
176-
match self.dialog_layer_inner.dialogs.write() {
177-
Ok(mut dialogs) => match dialogs.remove(self.id) {
178-
Some(dlg) => {
179-
info!(%self.id, "unconfirmed dialog dropped, cancelling it");
180-
let _ = tokio::spawn(async move {
181-
if let Err(e) = dlg.hangup().await {
182-
info!(id=%dlg.id(), "failed to hangup unconfirmed dialog: {}", e);
183-
}
184-
});
185-
}
186-
None => {}
187-
},
188-
Err(e) => {
189-
warn!(%self.id, "failed to acquire write lock on dialogs: {}", e);
190-
}
191-
}
192-
}
193-
}
194-
195137
impl DialogLayer {
196138
/// Create an INVITE request from options
197139
///
@@ -396,12 +338,6 @@ impl DialogLayer {
396338
.unwrap()
397339
.insert(id.clone(), Dialog::ClientInvite(dialog.clone()));
398340
info!(%id, "client invite dialog created");
399-
400-
let _guard = DialogGuardForUnconfirmed {
401-
dialog_layer_inner: &self.inner,
402-
id: &id,
403-
};
404-
405341
match dialog.process_invite(tx).await {
406342
Ok((new_dialog_id, resp)) => {
407343
debug!(

src/dialog/server_dialog.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,6 @@ impl ServerInviteDialog {
572572
// discard old request
573573
return Ok(());
574574
}
575-
576575
self.inner
577576
.remote_seq
578577
.compare_exchange(remote_seq, cseq, Ordering::Relaxed, Ordering::Relaxed)

0 commit comments

Comments
 (0)