Skip to content

Commit d3fc3f2

Browse files
authored
Use RTPConverter from protocol/utils/rtputil (#781)
1 parent 7969942 commit d3fc3f2

File tree

4 files changed

+23
-63
lines changed

4 files changed

+23
-63
lines changed

go.mod

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1111
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b
1212
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade
13-
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6
13+
github.com/livekit/protocol v1.42.3-0.20251022084609-f19569a346e2
1414
github.com/magefile/mage v1.15.0
1515
github.com/pion/dtls/v3 v3.0.7
1616
github.com/pion/interceptor v0.1.41
@@ -29,8 +29,6 @@ require (
2929
require (
3030
github.com/moby/buildkit v0.25.1
3131
github.com/moby/patternmatcher v0.6.0
32-
github.com/pelletier/go-toml v1.9.5
33-
github.com/schollz/progressbar/v3 v3.18.0
3432
golang.org/x/mod v0.29.0
3533
)
3634

@@ -60,7 +58,6 @@ require (
6058
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
6159
github.com/in-toto/in-toto-golang v0.9.0 // indirect
6260
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 // indirect
63-
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
6461
github.com/moby/locker v1.0.1 // indirect
6562
github.com/moby/sys/signal v0.7.1 // indirect
6663
github.com/morikuni/aec v1.0.0 // indirect
@@ -69,7 +66,6 @@ require (
6966
github.com/opencontainers/runc v1.2.3 // indirect
7067
github.com/pkg/errors v0.9.1 // indirect
7168
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
72-
github.com/rivo/uniseg v0.4.7 // indirect
7369
github.com/secure-systems-lab/go-securesystemslib v0.6.0 // indirect
7470
github.com/shibumi/go-pathspec v1.3.0 // indirect
7571
github.com/sirupsen/logrus v1.9.3 // indirect
@@ -87,7 +83,6 @@ require (
8783
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
8884
go.opentelemetry.io/otel/trace v1.37.0 // indirect
8985
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
90-
golang.org/x/term v0.36.0 // indirect
9186
golang.org/x/time v0.12.0 // indirect
9287
golang.org/x/tools v0.38.0 // indirect
9388
)

go.sum

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
3636
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
3737
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
3838
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
39-
github.com/chengxilo/virtualterm v1.0.4 h1:Z6IpERbRVlfB8WkOmtbHiDbBANU7cimRIof7mk9/PwM=
40-
github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0iwWCOK1q10rlY=
4139
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
4240
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4=
4341
github.com/containerd/cgroups/v3 v3.0.5 h1:44na7Ud+VwyE7LIoJ8JTNQOa549a8543BmzaJHo6Bzo=
@@ -157,18 +155,14 @@ github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b h1:dPf9i0JPyf0Sg
157155
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
158156
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade h1:lpxPcglwzUWNB4J0S2qZuyMehzmR7vW9whzSwV4IGoI=
159157
github.com/livekit/mediatransportutil v0.0.0-20250825135402-7bc31f107ade/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
160-
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6 h1:Tby1v0yn0XCXl9nBVnZI9M1cQW/0o4E/ejzRgcaMETI=
161-
github.com/livekit/protocol v1.42.2-0.20251016024155-8cf58ff15ac6/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
158+
github.com/livekit/protocol v1.42.3-0.20251022084609-f19569a346e2 h1:WmMJMCdfKfN/WQffkt+XFRkiFZh33tgrnKBlK6Msm1w=
159+
github.com/livekit/protocol v1.42.3-0.20251022084609-f19569a346e2/go.mod h1:m8IMgMd9FUi3HnCBQMDExVumbmVfiJy9/xKOifDAQtY=
162160
github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU=
163161
github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
164162
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
165163
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
166-
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
167-
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
168164
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3 h1:Eaq36EIyJNp7b3qDhjV7jmDVq/yPeW2v4pTqzGbOGB4=
169165
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.3/go.mod h1:6KKUoQBZBW6PDXJtNfqeEjPXMj/ITTk+cWK9t9uS5+E=
170-
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
171-
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
172166
github.com/moby/buildkit v0.25.1 h1:j7IlVkeNbEo+ZLoxdudYCHpmTsbwKvhgc/6UJ/mY/o8=
173167
github.com/moby/buildkit v0.25.1/go.mod h1:phM8sdqnvgK2y1dPDnbwI6veUCXHOZ6KFSl6E164tkc=
174168
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
@@ -258,12 +252,8 @@ github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++
258252
github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
259253
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
260254
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
261-
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
262-
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
263255
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
264256
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
265-
github.com/schollz/progressbar/v3 v3.18.0 h1:uXdoHABRFmNIjUfte/Ex7WtuyVslrw2wVPQmCN62HpA=
266-
github.com/schollz/progressbar/v3 v3.18.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec=
267257
github.com/sclevine/spec v1.4.0 h1:z/Q9idDcay5m5irkZ28M7PtQM4aOISzOpj4bUPkDee8=
268258
github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM=
269259
github.com/secure-systems-lab/go-securesystemslib v0.6.0 h1:T65atpAVCJQK14UA57LMdZGpHi4QYSH/9FZyNGqMYIA=

pkg/synchronizer/track.go

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/livekit/mediatransportutil"
3131
"github.com/livekit/protocol/logger"
3232
"github.com/livekit/protocol/utils/mono"
33+
"github.com/livekit/protocol/utils/rtputil"
3334
)
3435

3536
const (
@@ -58,7 +59,7 @@ type TrackSynchronizer struct {
5859
sync *Synchronizer
5960
track TrackRemote
6061
logger logger.Logger
61-
*rtpConverter
62+
*rtputil.RTPConverter
6263
startGate startGate
6364

6465
// config
@@ -116,7 +117,7 @@ func newTrackSynchronizer(s *Synchronizer, track TrackRemote) *TrackSynchronizer
116117
sync: s,
117118
track: track,
118119
logger: logger.GetLogger().WithValues("trackID", track.ID(), "codec", track.Codec().MimeType),
119-
rtpConverter: newRTPConverter(int64(track.Codec().ClockRate)),
120+
RTPConverter: rtputil.NewRTPConverter(int64(track.Codec().ClockRate)),
120121
maxTsDiff: s.config.MaxTsDiff,
121122
maxDriftAdjustment: s.config.MaxDriftAdjustment,
122123
driftAdjustmentWindowPercent: s.config.DriftAdjustmentWindowPercent,
@@ -304,7 +305,7 @@ func (t *TrackSynchronizer) getPTSWithoutRebase(pkt jitter.ExtPacket) (time.Dura
304305
// start with estimated PTS to absorb any start latency
305306
pts = max(time.Nanosecond, estimatedPTS) // prevent lastPTS from being stuck at 0
306307
} else {
307-
pts = t.lastPTS + t.toDuration(ts-t.lastTS)
308+
pts = t.lastPTS + t.ToDuration(ts-t.lastTS)
308309
}
309310

310311
if pts < t.lastPTS || !t.acceptable(pts-estimatedPTS) {
@@ -435,7 +436,7 @@ func (t *TrackSynchronizer) getPTSWithRebase(pkt jitter.ExtPacket) (time.Duratio
435436
// start with estimated PTS to absorb any start latency
436437
pts = max(time.Nanosecond, estimatedPTS) // prevent lastPTS from being stuck at 0
437438
} else {
438-
pts = t.lastPTS + t.toDuration(ts-t.lastTS)
439+
pts = t.lastPTS + t.ToDuration(ts-t.lastTS)
439440
}
440441

441442
if pts < t.lastPTS || !t.acceptable(pts-estimatedPTS) {
@@ -543,9 +544,9 @@ func (t *TrackSynchronizer) onSenderReportWithoutRebase(pkt *rtcp.SenderReport)
543544

544545
var pts time.Duration
545546
if pkt.RTPTime > t.lastTS {
546-
pts = t.lastPTS + t.toDuration(pkt.RTPTime-t.lastTS)
547+
pts = t.lastPTS + t.ToDuration(pkt.RTPTime-t.lastTS)
547548
} else {
548-
pts = t.lastPTS - t.toDuration(t.lastTS-pkt.RTPTime)
549+
pts = t.lastPTS - t.ToDuration(t.lastTS-pkt.RTPTime)
549550
}
550551
if !t.acceptable(pts - time.Since(t.startTime)) {
551552
t.logger.Infow(
@@ -615,9 +616,9 @@ func (t *TrackSynchronizer) onSenderReportWithRebase(pkt *rtcp.SenderReport) {
615616

616617
var ptsSR time.Duration
617618
if (pkt.RTPTime - t.lastTS) < (1 << 31) {
618-
ptsSR = t.lastPTS + t.toDuration(pkt.RTPTime-t.lastTS)
619+
ptsSR = t.lastPTS + t.ToDuration(pkt.RTPTime-t.lastTS)
619620
} else {
620-
ptsSR = t.lastPTS - t.toDuration(t.lastTS-pkt.RTPTime)
621+
ptsSR = t.lastPTS - t.ToDuration(t.lastTS-pkt.RTPTime)
621622
}
622623
if !t.acceptable(ptsSR - time.Since(t.startTime)) {
623624
t.logger.Infow(
@@ -701,7 +702,7 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
701702
// in some network element along the way), push back first time
702703
// to an earlier instance.
703704
timeSinceReceive := time.Duration(nowNano - asr.receivedAtAdjusted)
704-
nowTS := asr.RTPTime + t.toRTP(timeSinceReceive)
705+
nowTS := asr.RTPTime + t.ToRTP(timeSinceReceive)
705706
samplesDiff := nowTS - t.startRTP
706707
if int32(samplesDiff) < 0 {
707708
// out-of-order, pre-start, skip
@@ -716,7 +717,7 @@ func (t *TrackSynchronizer) maybeAdjustStartTime(asr *augmentedSenderReport) int
716717
return 0
717718
}
718719

719-
samplesDuration := t.toDuration(samplesDiff)
720+
samplesDuration := t.ToDuration(samplesDiff)
720721
timeSinceStart := time.Duration(nowNano - startTimeNano)
721722
now := startTimeNano + timeSinceStart.Nanoseconds()
722723
adjustedStartTimeNano := now - samplesDuration.Nanoseconds()
@@ -931,33 +932,6 @@ func (t *TrackSynchronizer) MarshalLogObject(e zapcore.ObjectEncoder) error {
931932

932933
// ---------------------------
933934

934-
type rtpConverter struct {
935-
ts uint64
936-
rtp uint64
937-
}
938-
939-
func newRTPConverter(clockRate int64) *rtpConverter {
940-
ts := time.Second.Nanoseconds()
941-
for _, i := range []int64{10, 3, 2} {
942-
for ts%i == 0 && clockRate%i == 0 {
943-
ts /= i
944-
clockRate /= i
945-
}
946-
}
947-
948-
return &rtpConverter{ts: uint64(ts), rtp: uint64(clockRate)}
949-
}
950-
951-
func (c *rtpConverter) toDuration(rtpDuration uint32) time.Duration {
952-
return time.Duration(uint64(rtpDuration) * c.ts / c.rtp)
953-
}
954-
955-
func (c *rtpConverter) toRTP(duration time.Duration) uint32 {
956-
return uint32(duration.Nanoseconds() * int64(c.rtp) / int64(c.ts))
957-
}
958-
959-
// -----------------------------
960-
961935
type augmentedSenderReport struct {
962936
*rtcp.SenderReport
963937
receivedAt int64

pkg/synchronizer/track_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/livekit/media-sdk/jitter"
2626
"github.com/livekit/protocol/logger"
2727
"github.com/livekit/protocol/utils/mono"
28+
"github.com/livekit/protocol/utils/rtputil"
2829
)
2930

3031
// ---- test fakes & helpers ----
@@ -47,7 +48,7 @@ func newTSForTests(tc *testing.T, clockRate uint32, kind webrtc.RTPCodecType) *T
4748
sync: nil, // construct directly to avoid depending on Synchronizer
4849
track: fakeTrack{id: "t", rate: clockRate, kind: kind},
4950
logger: logger.NewTestLogger(tc),
50-
rtpConverter: newRTPConverter(int64(clockRate)),
51+
RTPConverter: rtputil.NewRTPConverter(int64(clockRate)),
5152
maxTsDiff: 200 * time.Millisecond,
5253
maxDriftAdjustment: 5 * time.Millisecond,
5354
}
@@ -110,7 +111,7 @@ func TestGetPTSWithoutRebase_Increasing(t *testing.T) {
110111

111112
// Simulate accepting two frames in order: 20ms and then 20ms later
112113
// Convert 20ms -> RTP ticks
113-
rtp20ms := ts.rtpConverter.toRTP(20 * time.Millisecond)
114+
rtp20ms := ts.ToRTP(20 * time.Millisecond)
114115

115116
now := time.Now()
116117
// First packet initializes lastTS path
@@ -154,7 +155,7 @@ func TestGetPTSWithoutRebase_NegativeAdjustedPTS(t *testing.T) {
154155
ts.initialize(firstPacket)
155156
require.Less(t, ts.currentPTSOffset, time.Duration(0), "expected negative PTS offset when synchronizer start is later")
156157

157-
stepTS := ts.rtpConverter.toRTP(10 * time.Millisecond)
158+
stepTS := ts.ToRTP(10 * time.Millisecond)
158159
secondPacket := jitter.ExtPacket{
159160
Packet: &rtp.Packet{
160161
Header: rtp.Header{Timestamp: firstPacket.Packet.Timestamp + stepTS, SequenceNumber: 2},
@@ -203,8 +204,8 @@ func TestGetPTSWithRebase_PropelsForward(t *testing.T) {
203204
ts.startRTP = 100000
204205
ts.lastTS = ts.startRTP
205206

206-
rtp500ms := ts.rtpConverter.toRTP(500 * time.Millisecond)
207-
rtp10ms := ts.rtpConverter.toRTP(10 * time.Millisecond)
207+
rtp500ms := ts.ToRTP(500 * time.Millisecond)
208+
rtp10ms := ts.ToRTP(10 * time.Millisecond)
208209

209210
// First packet (~500ms)
210211
ts1 := ts.startRTP + rtp500ms
@@ -279,7 +280,7 @@ func TestPrimeForStartWithStartGate(t *testing.T) {
279280
ts.sync = NewSynchronizerWithOptions()
280281

281282
stepDur := 20 * time.Millisecond
282-
step := ts.rtpConverter.toRTP(stepDur)
283+
step := ts.ToRTP(stepDur)
283284
baseTS := ts.startRTP
284285
base := time.Now()
285286

@@ -396,7 +397,7 @@ func TestNormalizePTSToMediaPipelineTimeline_FreshBehindDoesNotCorrect(t *testin
396397
ptsIn := running - delay - time.Second
397398
ts.lastPTS = ptsIn
398399
ts.lastPTSAdjusted = ptsIn
399-
sampleTS := ts.toRTP(ptsIn)
400+
sampleTS := ts.ToRTP(ptsIn)
400401
initialStartRTP := ts.startRTP
401402
initialTimely := mono.Now()
402403
ts.lastTimelyPacket = initialTimely
@@ -414,7 +415,7 @@ func TestNormalizePTSToMediaPipelineTimeline_CorrectsAfterLongLag(t *testing.T)
414415
ptsIn := running - delay - 5*time.Second
415416
ts.lastPTS = ptsIn
416417
ts.lastPTSAdjusted = ptsIn
417-
sampleTS := ts.toRTP(ptsIn)
418+
sampleTS := ts.ToRTP(ptsIn)
418419
ts.lastTimelyPacket = mono.Now().Add(-cMaxTimelyPacketAge - time.Second)
419420

420421
deadline, ok := ts.sync.getExternalMediaDeadline()

0 commit comments

Comments
 (0)