Skip to content

Commit fdd6af8

Browse files
authored
Single peer connection support (#724)
* Test * WIP * WIP * WIP * WIP * deps * Anunay feedback
1 parent b0873be commit fdd6af8

File tree

7 files changed

+270
-156
lines changed

7 files changed

+270
-156
lines changed

engine.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"github.com/pion/webrtc/v4"
2626
"go.uber.org/atomic"
27-
"golang.org/x/mod/semver"
2827
"google.golang.org/protobuf/encoding/protojson"
2928
"google.golang.org/protobuf/proto"
3029

@@ -72,6 +71,7 @@ type engineHandler interface {
7271
OnStreamTrailer(*livekit.DataStream_Trailer)
7372
OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed)
7473
OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate)
74+
OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement)
7575
}
7676

7777
type nullEngineHandler struct{}
@@ -103,6 +103,8 @@ func (n *nullEngineHandler) OnStreamTrailer(*livekit.DataStream_Trailer)
103103
func (n *nullEngineHandler) OnLocalTrackSubscribed(trackSubscribed *livekit.TrackSubscribed) {}
104104
func (n *nullEngineHandler) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.SubscribedQualityUpdate) {
105105
}
106+
func (n *nullEngineHandler) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) {
107+
}
106108

107109
// -------------------------------------------
108110

@@ -125,6 +127,7 @@ const (
125127
type RTCEngine struct {
126128
log protoLogger.Logger
127129

130+
useSinglePeerConnection bool
128131
engineHandler engineHandler
129132
cbGetLocalParticipantSID func() string
130133

@@ -166,18 +169,20 @@ type RTCEngine struct {
166169
}
167170

168171
func NewRTCEngine(
172+
useSinglePeerConnection bool,
169173
engineHandler engineHandler,
170174
getLocalParticipantSID func() string,
171175
) *RTCEngine {
172176
e := &RTCEngine{
173177
log: logger,
178+
useSinglePeerConnection: useSinglePeerConnection,
174179
engineHandler: engineHandler,
175180
cbGetLocalParticipantSID: getLocalParticipantSID,
176181
trackPublishedListeners: make(map[string]chan *livekit.TrackPublishedResponse),
177182
joinTimeout: 15 * time.Second,
178183
reliableMsgSeq: 1,
179184
}
180-
if semver.Compare("v"+Version, "v3.0.0") < 0 {
185+
if !useSinglePeerConnection {
181186
e.signalling = signalling.NewSignalling(signalling.SignallingParams{
182187
Logger: e.log,
183188
})
@@ -299,7 +304,7 @@ func (e *RTCEngine) IsConnected() bool {
299304
e.pclock.Lock()
300305
defer e.pclock.Unlock()
301306

302-
if e.publisher == nil || e.subscriber == nil {
307+
if e.publisher == nil || (!e.useSinglePeerConnection && e.subscriber == nil) {
303308
return false
304309
}
305310
if e.subscriberPrimary {
@@ -445,6 +450,10 @@ func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration)
445450
}
446451

447452
func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration) error {
453+
if e.useSinglePeerConnection {
454+
return nil
455+
}
456+
448457
var err error
449458
if e.subscriber, err = NewPCTransport(PCTransportParams{
450459
Configuration: configuration,
@@ -817,6 +826,7 @@ func (e *RTCEngine) createSubscriberPCAnswerAndSend() error {
817826
e.log.Errorw("could not set subscriber local description", err)
818827
return err
819828
}
829+
e.log.Debugw("sending answer for subscriber", "answer", answer)
820830
if err := e.signalTransport.SendMessage(
821831
e.signalling.SignalSdpAnswer(
822832
protosignalling.ToProtoSessionDescription(answer, 0),
@@ -1242,14 +1252,18 @@ func (e *RTCEngine) OnReconnectResponse(res *livekit.ReconnectResponse) error {
12421252
e.pclock.Lock()
12431253
defer e.pclock.Unlock()
12441254

1245-
if err := e.publisher.SetConfiguration(configuration); err != nil {
1246-
e.log.Errorw("could not set rtc configuration for publisher", err)
1247-
return err
1255+
if e.publisher != nil {
1256+
if err := e.publisher.SetConfiguration(configuration); err != nil {
1257+
e.log.Errorw("could not set rtc configuration for publisher", err)
1258+
return err
1259+
}
12481260
}
12491261

1250-
if err := e.subscriber.SetConfiguration(configuration); err != nil {
1251-
e.log.Errorw("could not set rtc configuration for subscriber", err)
1252-
return err
1262+
if e.subscriber != nil {
1263+
if err := e.subscriber.SetConfiguration(configuration); err != nil {
1264+
e.log.Errorw("could not set rtc configuration for subscriber", err)
1265+
return err
1266+
}
12531267
}
12541268

12551269
return nil
@@ -1274,7 +1288,7 @@ func (e *RTCEngine) OnOffer(sd webrtc.SessionDescription, offerId uint32) {
12741288
return
12751289
}
12761290

1277-
e.log.Debugw("received offer for subscriber")
1291+
e.log.Debugw("received offer for subscriber", "offer", sd, "offerId", offerId)
12781292
if err := e.subscriber.SetRemoteDescription(sd); err != nil {
12791293
e.log.Errorw("could not set remote description", err)
12801294
return
@@ -1371,6 +1385,10 @@ func (e *RTCEngine) OnSubscribedQualityUpdate(subscribedQualityUpdate *livekit.S
13711385
e.engineHandler.OnSubscribedQualityUpdate(subscribedQualityUpdate)
13721386
}
13731387

1388+
func (e *RTCEngine) OnMediaSectionsRequirement(mediaSectionsRequirement *livekit.MediaSectionsRequirement) {
1389+
e.engineHandler.OnMediaSectionsRequirement(mediaSectionsRequirement)
1390+
}
1391+
13741392
// ------------------------------------
13751393

13761394
func setConfiguration(pcTransport *PCTransport, configuration webrtc.Configuration) {

go.mod

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,26 @@ require (
1010
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1111
github.com/livekit/media-sdk v0.0.0-20250518151703-b07af88637c5
1212
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
13-
github.com/livekit/protocol v1.39.4-0.20250807105828-ccbae8154e54
13+
github.com/livekit/protocol v1.40.1-0.20250826073447-c714707269e5
1414
github.com/magefile/mage v1.15.0
1515
github.com/pion/dtls/v3 v3.0.7
1616
github.com/pion/interceptor v0.1.40
1717
github.com/pion/rtcp v1.2.15
1818
github.com/pion/rtp v1.8.21
1919
github.com/pion/sdp/v3 v3.0.15
20-
github.com/pion/webrtc/v4 v4.1.3
20+
github.com/pion/webrtc/v4 v4.1.5-0.20250825162555-4b37165dcc27
2121
github.com/stretchr/testify v1.10.0
2222
github.com/twitchtv/twirp v8.1.3+incompatible
2323
go.uber.org/atomic v1.11.0
24-
golang.org/x/crypto v0.40.0
25-
golang.org/x/exp v0.0.0-20250718183923-645b1fa84792
26-
google.golang.org/protobuf v1.36.7
24+
golang.org/x/crypto v0.41.0
25+
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b
26+
google.golang.org/protobuf v1.36.8
2727
)
2828

29-
require golang.org/x/mod v0.26.0
29+
require golang.org/x/mod v0.27.0
3030

3131
require (
32-
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250717185734-6c6e0d3c608e.1 // indirect
32+
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.8-20250717185734-6c6e0d3c608e.1 // indirect
3333
buf.build/go/protovalidate v0.14.0 // indirect
3434
buf.build/go/protoyaml v0.6.0 // indirect
3535
cel.dev/expr v0.24.0 // indirect
@@ -38,7 +38,7 @@ require (
3838
github.com/benbjohnson/clock v1.3.5 // indirect
3939
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4040
github.com/davecgh/go-spew v1.1.1 // indirect
41-
github.com/dennwc/iters v1.1.0 // indirect
41+
github.com/dennwc/iters v1.2.2 // indirect
4242
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
4343
github.com/frostbyte73/core v0.1.1 // indirect
4444
github.com/fsnotify/fsnotify v1.9.0 // indirect
@@ -54,7 +54,7 @@ require (
5454
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
5555
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
5656
github.com/livekit/psrpc v0.6.1-0.20250726180611-3915e005e741 // indirect
57-
github.com/nats-io/nats.go v1.44.0 // indirect
57+
github.com/nats-io/nats.go v1.45.0 // indirect
5858
github.com/nats-io/nkeys v0.4.11 // indirect
5959
github.com/nats-io/nuid v1.0.1 // indirect
6060
github.com/opencontainers/runc v1.1.14 // indirect
@@ -64,26 +64,26 @@ require (
6464
github.com/pion/mdns/v2 v2.0.7 // indirect
6565
github.com/pion/randutil v0.1.0 // indirect
6666
github.com/pion/sctp v1.8.39 // indirect
67-
github.com/pion/srtp/v3 v3.0.6 // indirect
67+
github.com/pion/srtp/v3 v3.0.7 // indirect
6868
github.com/pion/stun/v3 v3.0.0 // indirect
6969
github.com/pion/transport/v3 v3.0.7 // indirect
70-
github.com/pion/turn/v4 v4.0.2 // indirect
70+
github.com/pion/turn/v4 v4.1.1 // indirect
7171
github.com/pmezard/go-difflib v1.0.0 // indirect
7272
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
73-
github.com/redis/go-redis/v9 v9.12.0 // indirect
73+
github.com/redis/go-redis/v9 v9.12.1 // indirect
7474
github.com/stoewer/go-strcase v1.3.1 // indirect
7575
github.com/wlynxg/anet v0.0.5 // indirect
7676
github.com/zeebo/xxh3 v1.0.2 // indirect
7777
go.uber.org/multierr v1.11.0 // indirect
7878
go.uber.org/zap v1.27.0 // indirect
7979
go.uber.org/zap/exp v0.3.0 // indirect
80-
golang.org/x/net v0.42.0 // indirect
80+
golang.org/x/net v0.43.0 // indirect
8181
golang.org/x/sync v0.16.0 // indirect
82-
golang.org/x/sys v0.34.0 // indirect
83-
golang.org/x/text v0.27.0 // indirect
84-
google.golang.org/genproto/googleapis/api v0.0.0-20250804133106-a7a43d27e69b // indirect
85-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250804133106-a7a43d27e69b // indirect
86-
google.golang.org/grpc v1.74.2 // indirect
82+
golang.org/x/sys v0.35.0 // indirect
83+
golang.org/x/text v0.28.0 // indirect
84+
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect
85+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
86+
google.golang.org/grpc v1.75.0 // indirect
8787
gopkg.in/hraban/opus.v2 v2.0.0-20230925203106-0188a62cb302 // indirect
8888
gopkg.in/yaml.v3 v3.0.1 // indirect
8989
)

0 commit comments

Comments
 (0)