Skip to content

Commit a20bf21

Browse files
committed
add timeline test
1 parent d04d07a commit a20bf21

File tree

1 file changed

+224
-7
lines changed

1 file changed

+224
-7
lines changed

pulsebeam/src/rtp/timeline.rs

Lines changed: 224 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub struct Timeline {
1818

1919
impl Timeline {
2020
pub fn new(clock_rate: Frequency) -> Self {
21-
let base_seq_no: u8 = rand::random();
21+
let base_seq_no: u16 = rand::random();
2222
Self {
2323
clock_rate,
2424
highest_seq_no: SeqNo::from(base_seq_no as u64),
@@ -27,29 +27,246 @@ impl Timeline {
2727
}
2828
}
2929

30-
// packet is guaranteed to be the first packet after a marker and is a keyframe
30+
/// Re-aligns the timeline to a new stream starting with `packet`.
31+
/// `packet` must be a keyframe (start of a new GOP).
3132
pub fn rebase(&mut self, packet: &RtpPacket) {
32-
debug_assert!(packet.is_keyframe_start);
33+
debug_assert!(
34+
packet.is_keyframe_start,
35+
"Rebase should only happen on keyframe boundaries"
36+
);
3337

38+
// We want the new packet to immediately follow the last output packet.
39+
// target = highest_output + 1
40+
// target = input + offset => offset = target - input
3441
let target_seq_no = self.highest_seq_no.wrapping_add(1);
3542
self.offset_seq_no = target_seq_no.wrapping_sub(*packet.seq_no);
3643

44+
// Initialize anchor if this is the very first packet
3745
if self.anchor.is_none() {
3846
self.anchor = Some(packet.playout_time);
3947
}
4048
}
4149

4250
pub fn rewrite(&mut self, mut pkt: RtpPacket) -> RtpPacket {
43-
pkt.seq_no = pkt.seq_no.wrapping_add(self.offset_seq_no).into();
44-
if pkt.seq_no > self.highest_seq_no {
51+
let new_seq_u64 = pkt.seq_no.wrapping_add(self.offset_seq_no);
52+
pkt.seq_no = new_seq_u64.into();
53+
54+
if new_seq_u64 > *self.highest_seq_no {
4555
self.highest_seq_no = pkt.seq_no;
4656
}
4757

4858
let anchor = self
4959
.anchor
5060
.expect("rebase must have occured before rewriting to create the first anchor");
51-
pkt.rtp_ts = MediaTime::from(pkt.playout_time.saturating_duration_since(anchor))
52-
.rebase(self.clock_rate);
61+
62+
let duration = pkt.playout_time.saturating_duration_since(anchor);
63+
pkt.rtp_ts = MediaTime::from(duration).rebase(self.clock_rate);
64+
5365
pkt
5466
}
5567
}
68+
69+
#[cfg(test)]
70+
mod test {
71+
use super::*;
72+
use std::time::Duration;
73+
74+
#[test]
75+
fn test_simple_continuity() {
76+
let start_time = Instant::now();
77+
let mut timeline = Timeline::new(Frequency::NINETY_KHZ);
78+
79+
// Packet 1: First packet (Keyframe) - requires rebase
80+
let p1 = RtpPacket {
81+
seq_no: 100.into(),
82+
playout_time: start_time,
83+
is_keyframe_start: true,
84+
..Default::default()
85+
};
86+
87+
timeline.rebase(&p1);
88+
let out1 = timeline.rewrite(p1);
89+
let base_seq = *out1.seq_no;
90+
91+
// Packet 2: Regular packet, 100ms later
92+
let p2 = RtpPacket {
93+
seq_no: 101.into(),
94+
playout_time: start_time + Duration::from_millis(100),
95+
is_keyframe_start: false,
96+
..Default::default()
97+
};
98+
99+
let out2 = timeline.rewrite(p2);
100+
101+
// Verify Sequence Continuity
102+
assert_eq!(
103+
*out2.seq_no,
104+
base_seq + 1,
105+
"Sequence number should increment by 1"
106+
);
107+
108+
// Verify Timestamp: 100ms at 90kHz = 9000 ticks
109+
let ts_diff = out2.rtp_ts.numer().wrapping_sub(out1.rtp_ts.numer());
110+
assert_eq!(ts_diff, 9000, "Timestamp should correspond to 100ms delta");
111+
}
112+
113+
#[test]
114+
fn test_switching_streams() {
115+
let start_time = Instant::now();
116+
let mut timeline = Timeline::new(Frequency::NINETY_KHZ);
117+
118+
// --- Stream A (Seq 1000-1001) ---
119+
let p_a1 = RtpPacket {
120+
seq_no: 1000.into(),
121+
playout_time: start_time,
122+
is_keyframe_start: true,
123+
..Default::default()
124+
};
125+
126+
timeline.rebase(&p_a1);
127+
let out_a1 = timeline.rewrite(p_a1);
128+
129+
let p_a2 = RtpPacket {
130+
seq_no: 1001.into(),
131+
playout_time: start_time + Duration::from_millis(33),
132+
is_keyframe_start: false,
133+
..Default::default()
134+
};
135+
let out_a2 = timeline.rewrite(p_a2);
136+
137+
// --- Switch to Stream B (Seq 5000) ---
138+
// Stream B arrives 100ms after start, starting at random Seq 5000
139+
let p_b1 = RtpPacket {
140+
seq_no: 5000.into(),
141+
playout_time: start_time + Duration::from_millis(100),
142+
is_keyframe_start: true,
143+
..Default::default()
144+
};
145+
146+
timeline.rebase(&p_b1); // Rebase aligns Stream B to follow Stream A
147+
let out_b1 = timeline.rewrite(p_b1);
148+
149+
// Verify Continuity across switch
150+
assert_eq!(
151+
*out_b1.seq_no,
152+
(*out_a2.seq_no).wrapping_add(1),
153+
"Output sequence must be continuous across switch"
154+
);
155+
156+
// Verify Timestamp linearity
157+
// A1=0ms, B1=100ms. Diff should be 9000 ticks.
158+
// The timeline calculates B1 based on (B1.time - anchor), where anchor is A1.time
159+
let ts_diff = out_b1.rtp_ts.numer().wrapping_sub(out_a1.rtp_ts.numer());
160+
assert_eq!(ts_diff, 9000, "Timestamp should be linear across switch");
161+
}
162+
163+
#[test]
164+
fn test_sequence_wrapping_u64() {
165+
let start_time = Instant::now();
166+
let mut timeline = Timeline::new(Frequency::NINETY_KHZ);
167+
168+
// Start normally
169+
let p1 = RtpPacket {
170+
seq_no: 10.into(),
171+
playout_time: start_time,
172+
is_keyframe_start: true,
173+
..Default::default()
174+
};
175+
timeline.rebase(&p1);
176+
let out1 = timeline.rewrite(p1);
177+
178+
// Manually force the internal highest_seq_no to u64 boundary - 1
179+
// We can't modify private state directly, so we simulate a stream
180+
// that pushes the timeline to the edge.
181+
// Or, we rely on the fact that `rebase` calculates offset based on wrapping arithmetic.
182+
183+
// Let's simulate a switch to a stream where the calculation wraps.
184+
// Current Out: X.
185+
// We want next Out: X+1.
186+
// Input is Y.
187+
// Offset = (X+1) - Y.
188+
189+
// We simply verify strict +1 increments even if input jumps largely
190+
let p_next = RtpPacket {
191+
seq_no: u64::MAX.into(), // Input at boundary
192+
playout_time: start_time + Duration::from_millis(33),
193+
is_keyframe_start: true,
194+
..Default::default()
195+
};
196+
197+
// Switch to high sequence number
198+
timeline.rebase(&p_next);
199+
let out_next = timeline.rewrite(p_next);
200+
201+
assert_eq!(*out_next.seq_no, (*out1.seq_no).wrapping_add(1));
202+
203+
// Next packet wraps the input
204+
let p_wrap = RtpPacket {
205+
seq_no: 0.into(),
206+
playout_time: start_time + Duration::from_millis(66),
207+
is_keyframe_start: false,
208+
..Default::default()
209+
};
210+
211+
// Normal rewrite (no rebase needed for contiguous input)
212+
// Input: u64::MAX -> 0 (wrapped)
213+
// Output: X+1 -> X+2
214+
let out_wrap = timeline.rewrite(p_wrap);
215+
assert_eq!(*out_wrap.seq_no, (*out_next.seq_no).wrapping_add(1));
216+
}
217+
218+
#[test]
219+
#[should_panic(expected = "rebase must have occured before rewriting")]
220+
fn test_panic_if_no_rebase() {
221+
let start_time = Instant::now();
222+
let mut timeline = Timeline::new(Frequency::NINETY_KHZ);
223+
224+
// Packet without rebase
225+
let p1 = RtpPacket {
226+
seq_no: 100.into(),
227+
playout_time: start_time,
228+
is_keyframe_start: true,
229+
..Default::default()
230+
};
231+
232+
// This should panic because anchor is None
233+
timeline.rewrite(p1);
234+
}
235+
236+
#[test]
237+
fn test_late_packet_ordering() {
238+
// Ensures that if a packet arrives late (playout time < previous),
239+
// the timestamp is calculated correctly relative to anchor.
240+
let start_time = Instant::now();
241+
let mut timeline = Timeline::new(Frequency::NINETY_KHZ);
242+
243+
// Packet 1 (Base)
244+
let p1 = RtpPacket {
245+
seq_no: 10.into(),
246+
playout_time: start_time + Duration::from_millis(100),
247+
is_keyframe_start: true,
248+
..Default::default()
249+
};
250+
timeline.rebase(&p1); // Sets anchor at T+100ms
251+
let _out1 = timeline.rewrite(p1);
252+
253+
// Packet 2 (Arrives with earlier playout time due to reordering/jitter)
254+
// T+50ms (Before anchor)
255+
// Since `saturating_duration_since` is used, this should clamp to 0
256+
// or handle gracefully depending on logic.
257+
// The current logic: playout.saturating_duration_since(anchor)
258+
// If playout < anchor, result is 0.
259+
let p2 = RtpPacket {
260+
seq_no: 11.into(),
261+
playout_time: start_time + Duration::from_millis(50),
262+
is_keyframe_start: false,
263+
..Default::default()
264+
};
265+
266+
let out2 = timeline.rewrite(p2);
267+
268+
// Anchor is at T+100. P2 is at T+50.
269+
// saturating_duration_since(100) returns 0.
270+
assert_eq!(out2.rtp_ts.numer(), 0);
271+
}
272+
}

0 commit comments

Comments
 (0)