Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions src/dialog/client_dialog.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use super::dialog::DialogInnerRef;
use super::DialogId;
use crate::dialog::{
authenticate::handle_client_authenticate,
dialog::{DialogInner, DialogState, TerminatedReason},
};
use crate::dialog::dialog::DialogInner;
use crate::rsip_ext::RsipResponseExt;
use crate::transaction::transaction::Transaction;
use crate::Result;
use rsip::prelude::HeadersExt;
use crate::{
dialog::{
authenticate::handle_client_authenticate,
dialog::{DialogState, TerminatedReason},
},
rsip_ext::extract_uri_from_contact,
};
use rsip::{
headers::Route,
prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
Header,
};
use rsip::{Response, SipMessage, StatusCode};
use std::sync::atomic::Ordering;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -521,13 +529,33 @@ impl ClientInviteDialog {
}
match resp.status_code {
StatusCode::OK => {
if let Ok(contact) = resp.contact_header() {
self.inner
.remote_contact
.lock()
.unwrap()
.replace(contact.clone());
// 200 response to INVITE always contains Contact header
let contact = resp.contact_header()?;
self.inner
.remote_contact
.lock()
.unwrap()
.replace(contact.clone());

// update remote uri
let uri = if let Ok(typed_contact) = contact.typed() {
typed_contact.uri
} else {
let mut uri = extract_uri_from_contact(contact.value())?;
uri.headers.clear();
uri
};
*self.inner.remote_uri.lock().unwrap() = uri;

// update route set from Record-Route header
let mut route_set = Vec::new();
for header in resp.headers.iter() {
if let Header::RecordRoute(record_route) = header {
route_set.push(Route::from(record_route.value()));
}
}
*self.inner.route_set.lock().unwrap() = route_set;

self.inner
.transition(DialogState::Confirmed(dialog_id.clone()))?;
DialogInner::serve_keepalive_options(self.inner.clone());
Expand Down
40 changes: 16 additions & 24 deletions src/dialog/dialog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use super::{
DialogId,
};
use crate::{
header_pop,
rsip_ext::extract_uri_from_contact,
transaction::{
endpoint::EndpointInnerRef,
Expand Down Expand Up @@ -173,13 +172,13 @@ pub struct DialogInner {
pub remote_contact: Mutex<Option<rsip::headers::untyped::Contact>>,

pub remote_seq: AtomicU32,
pub remote_uri: rsip::Uri,
pub remote_uri: Mutex<rsip::Uri>,

pub from: String,
pub to: Mutex<String>,

pub credential: Option<Credential>,
pub route_set: Vec<Route>,
pub route_set: Mutex<Vec<Route>>,
pub(super) endpoint_inner: EndpointInnerRef,
pub(super) state_sender: DialogStateSender,
pub(super) tu_sender: TransactionEventSender,
Expand Down Expand Up @@ -244,21 +243,17 @@ impl DialogInner {
}
}

// For UAS (server), route set must be reversed (RFC 3261 section 12.1.1)
if role == TransactionRole::Server {
route_set.reverse();
}
Ok(Self {
role,
cancel_token: CancellationToken::new(),
id: Mutex::new(id.clone()),
from,
to: Mutex::new(to),
local_seq: AtomicU32::new(cseq),
remote_uri,
remote_uri: Mutex::new(remote_uri),
remote_seq: AtomicU32::new(0),
credential,
route_set,
route_set: Mutex::new(route_set),
endpoint_inner,
state_sender,
tu_sender,
Expand Down Expand Up @@ -344,8 +339,9 @@ impl DialogInner {
.as_ref()
.map(|c| headers.push(Contact::from(c.clone()).into()));

for route in &self.route_set {
headers.push(Header::Route(route.clone()));
{
let route_set = self.route_set.lock().unwrap();
headers.extend(route_set.iter().cloned().map(Header::Route));
}
headers.push(Header::MaxForwards(70.into()));

Expand All @@ -355,7 +351,7 @@ impl DialogInner {

let req = rsip::Request {
method,
uri: self.remote_uri.clone(),
uri: self.remote_uri.lock().unwrap().clone(),
headers: headers.into(),
body: body.unwrap_or_default(),
version: rsip::Version::V2,
Expand Down Expand Up @@ -450,26 +446,20 @@ impl DialogInner {
}
}

pub(super) async fn do_request(&self, mut request: Request) -> Result<Option<rsip::Response>> {
pub(super) async fn do_request(&self, request: Request) -> Result<Option<rsip::Response>> {
let method = request.method().to_owned();
let mut destination = request
.route_header()
.map(|r| {
r.typed()
.ok()
.map(|r| r.uris().first().map(|u| u.uri.clone()))
.flatten()
})
.flatten();
let mut destination = request.route_header().and_then(|r| {
r.typed()
.ok()
.and_then(|r| r.uris().first().map(|u| u.uri.clone()))
});

if destination.is_none() {
if let Some(contact) = self.remote_contact.lock().unwrap().as_ref() {
destination = contact.uri().ok();
}
}

header_pop!(request.headers, Header::Route);

let key = TransactionKey::from_request(&request, TransactionRole::Client)?;
let mut tx = Transaction::new_client(key, request, self.endpoint_inner.clone(), None);
tx.destination = destination.as_ref().map(|d| d.try_into().ok()).flatten();
Expand Down Expand Up @@ -593,6 +583,8 @@ impl DialogInner {

tokio::spawn(async move {
let mut ticker = interval(keepalive);
// skip first tick, which will be reached immediately
ticker.tick().await;
let keepalive_loop = async {
loop {
ticker.tick().await;
Expand Down
73 changes: 59 additions & 14 deletions src/transaction/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::transaction::make_via_branch;
use crate::{rsip_ext::extract_uri_from_contact, transaction::make_via_branch};

use super::{endpoint::EndpointInner, make_call_id};
use rsip::{prelude::ToTypedHeader, Header, Request, Response, StatusCode};
use rsip::{
header,
headers::Route,
prelude::{HeadersExt, ToTypedHeader, UntypedHeader},
Error, Header, Request, Response, StatusCode,
};

impl EndpointInner {
/// Create a SIP request message
Expand Down Expand Up @@ -235,20 +240,59 @@ impl EndpointInner {
}
}

pub fn make_ack(&self, uri: rsip::Uri, resp: &Response) -> Request {
pub fn make_ack(&self, mut uri: rsip::Uri, resp: &Response) -> crate::Result<Request> {
let mut headers = resp.headers.clone();
// Check if response to INVITE
let mut resp_to_invite = false;
if let Ok(cseq) = resp.cseq_header() {
if let Ok(rsip::Method::Invite) = cseq.method() {
resp_to_invite = true;
}
}

if resp.status_code.kind() == rsip::StatusCodeKind::Successful {
for h in headers.iter_mut() {
if let Header::Via(via) = h {
if let Ok(mut typed_via) = via.typed() {
typed_via
.params
.retain(|p| !matches!(p, rsip::Param::Branch(_)));
*via = typed_via.with_param(make_via_branch()).untyped();
// For 200 OK responses to INVITE, this creates an ACK with:
// 1. Request-URI from Contact header
// 2. Route headers from Record-Route headers
// 3. New Via branch (creates new transaction)
//
// **FIXME**: This duplicates logic in `client_dialog.rs` - should be refactored
if resp_to_invite && resp.status_code.kind() == rsip::StatusCodeKind::Successful {
if let Ok(top_most_via) = header!(
headers.iter_mut(),
Header::Via,
Error::missing_header("Via")
) {
if let Ok(mut typed_via) = top_most_via.typed() {
for param in typed_via.params.iter_mut() {
if let rsip::Param::Branch(_) = param {
*param = make_via_branch();
}
}
*top_most_via = typed_via.into();
}
}

let contact = resp.contact_header()?;

let contact_uri = if let Ok(typed_contact) = contact.typed() {
typed_contact.uri
} else {
let mut uri = extract_uri_from_contact(contact.value())?;
uri.headers.clear();
uri
};
uri = contact_uri;

// update route set from Record-Route header
let mut route_set = Vec::new();
for header in resp.headers.iter() {
if let Header::RecordRoute(record_route) = header {
route_set.push(Header::Route(Route::from(record_route.value())));
}
}

route_set.reverse();
headers.extend(route_set);
}

headers.retain(|h| {
Expand All @@ -258,22 +302,23 @@ impl EndpointInner {
| Header::CallId(_)
| Header::From(_)
| Header::To(_)
| Header::MaxForwards(_)
| Header::CSeq(_)
| Header::Route(_)
)
});
headers.push(Header::MaxForwards(70.into()));
headers.iter_mut().for_each(|h| {
if let Header::CSeq(cseq) = h {
cseq.mut_method(rsip::Method::Ack).ok();
}
});
headers.unique_push(Header::UserAgent(self.user_agent.clone().into()));
rsip::Request {
Ok(rsip::Request {
method: rsip::Method::Ack,
uri,
headers: headers.into(),
body: vec![],
version: rsip::Version::V2,
}
})
}
}
2 changes: 1 addition & 1 deletion src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl Transaction {
None => match self.last_response {
Some(ref resp) => self
.endpoint_inner
.make_ack(self.original.uri.clone(), resp),
.make_ack(self.original.uri.clone(), resp)?,
None => {
return Err(Error::TransactionError(
"no last response found to send ACK".to_string(),
Expand Down