Skip to content

Commit e23ad8c

Browse files
committed
Bridger: Start RTMP2RTC bridger in RTMP publisher
1 parent d2a348e commit e23ad8c

File tree

5 files changed

+49
-138
lines changed

5 files changed

+49
-138
lines changed

trunk/src/app/srs_app_rtc_source.cpp

-72
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ SrsRtcStream::SrsRtcStream()
337337
stream_desc_ = NULL;
338338

339339
req = NULL;
340-
bridger_ = new SrsRtcDummyBridger(this);
341340
}
342341

343342
SrsRtcStream::~SrsRtcStream()
@@ -347,7 +346,6 @@ SrsRtcStream::~SrsRtcStream()
347346
consumers.clear();
348347

349348
srs_freep(req);
350-
srs_freep(bridger_);
351349
srs_freep(stream_desc_);
352350
}
353351

@@ -408,11 +406,6 @@ SrsContextId SrsRtcStream::pre_source_id()
408406
return _pre_source_id;
409407
}
410408

411-
ISrsSourceBridger* SrsRtcStream::bridger()
412-
{
413-
return bridger_;
414-
}
415-
416409
srs_error_t SrsRtcStream::create_consumer(SrsRtcConsumer*& consumer)
417410
{
418411
srs_error_t err = srs_success;
@@ -477,16 +470,6 @@ srs_error_t SrsRtcStream::on_publish()
477470
is_created_ = true;
478471
is_delivering_packets_ = true;
479472

480-
// Create a new bridger, because it's been disposed when unpublish.
481-
#ifdef SRS_FFMPEG_FIT
482-
SrsRtcFromRtmpBridger* impl = new SrsRtcFromRtmpBridger(this);
483-
if ((err = impl->initialize(req)) != srs_success) {
484-
return srs_error_wrap(err, "bridge initialize");
485-
}
486-
487-
bridger_->setup(impl);
488-
#endif
489-
490473
// Notify the consumers about stream change event.
491474
if ((err = on_source_changed()) != srs_success) {
492475
return srs_error_wrap(err, "source id change");
@@ -522,11 +505,6 @@ void SrsRtcStream::on_unpublish()
522505
// release unpublish stream description.
523506
set_stream_desc(NULL);
524507

525-
// Dispose the impl of bridger, to free memory.
526-
#ifdef SRS_FFMPEG_FIT
527-
bridger_->setup(NULL);
528-
#endif
529-
530508
// TODO: FIXME: Handle by statistic.
531509
}
532510

@@ -1196,56 +1174,6 @@ srs_error_t SrsRtcFromRtmpBridger::consume_packets(vector<SrsRtpPacketCacheHelpe
11961174
}
11971175
#endif
11981176

1199-
SrsRtcDummyBridger::SrsRtcDummyBridger(SrsRtcStream* s)
1200-
{
1201-
rtc_ = s;
1202-
impl_ = NULL;
1203-
}
1204-
1205-
SrsRtcDummyBridger::~SrsRtcDummyBridger()
1206-
{
1207-
srs_freep(impl_);
1208-
}
1209-
1210-
srs_error_t SrsRtcDummyBridger::on_publish()
1211-
{
1212-
if (impl_) {
1213-
return impl_->on_publish();
1214-
}
1215-
return rtc_->on_publish();
1216-
}
1217-
1218-
srs_error_t SrsRtcDummyBridger::on_audio(SrsSharedPtrMessage* audio)
1219-
{
1220-
if (impl_) {
1221-
return impl_->on_audio(audio);
1222-
}
1223-
return srs_success;
1224-
}
1225-
1226-
srs_error_t SrsRtcDummyBridger::on_video(SrsSharedPtrMessage* video)
1227-
{
1228-
if (impl_) {
1229-
return impl_->on_video(video);
1230-
}
1231-
return srs_success;
1232-
}
1233-
1234-
void SrsRtcDummyBridger::on_unpublish()
1235-
{
1236-
if (impl_) {
1237-
impl_->on_unpublish();
1238-
return;
1239-
}
1240-
rtc_->on_unpublish();
1241-
}
1242-
1243-
void SrsRtcDummyBridger::setup(ISrsSourceBridger* impl)
1244-
{
1245-
srs_freep(impl_);
1246-
impl_ = impl;
1247-
}
1248-
12491177
SrsCodecPayload::SrsCodecPayload()
12501178
{
12511179
pt_of_publisher_ = pt_ = 0;

trunk/src/app/srs_app_rtc_source.hpp

-24
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class SrsRtpRingBuffer;
5656
class SrsRtpNackForReceiver;
5757
class SrsJsonObject;
5858
class SrsErrorPithyPrint;
59-
class SrsRtcDummyBridger;
6059

6160
class SrsNtp
6261
{
@@ -177,8 +176,6 @@ class SrsRtcStream
177176
SrsContextId _pre_source_id;
178177
SrsRequest* req;
179178
ISrsRtcPublishStream* publish_stream_;
180-
// Transmux RTMP to RTC.
181-
SrsRtcDummyBridger* bridger_;
182179
// Steam description for this steam.
183180
SrsRtcStreamDescription* stream_desc_;
184181
private:
@@ -204,8 +201,6 @@ class SrsRtcStream
204201
// Get current source id.
205202
virtual SrsContextId source_id();
206203
virtual SrsContextId pre_source_id();
207-
// Get the bridger.
208-
ISrsSourceBridger* bridger();
209204
public:
210205
// Create consumer
211206
// @param consumer, output the create consumer.
@@ -293,25 +288,6 @@ class SrsRtcFromRtmpBridger : public ISrsSourceBridger
293288
};
294289
#endif
295290

296-
class SrsRtcDummyBridger : public ISrsSourceBridger
297-
{
298-
private:
299-
SrsRtcStream* rtc_;
300-
// The optional implementation bridger, ignore if NULL.
301-
ISrsSourceBridger* impl_;
302-
public:
303-
SrsRtcDummyBridger(SrsRtcStream* s);
304-
virtual ~SrsRtcDummyBridger();
305-
public:
306-
virtual srs_error_t on_publish();
307-
virtual srs_error_t on_audio(SrsSharedPtrMessage* audio);
308-
virtual srs_error_t on_video(SrsSharedPtrMessage* video);
309-
virtual void on_unpublish();
310-
public:
311-
// Setup a new implementation bridger, which might be NULL to free previous one.
312-
void setup(ISrsSourceBridger* impl);
313-
};
314-
315291
// TODO: FIXME: Rename it.
316292
class SrsCodecPayload
317293
{

trunk/src/app/srs_app_rtmp_conn.cpp

+36-11
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ using namespace std;
5555
#include <srs_app_statistic.hpp>
5656
#include <srs_protocol_utility.hpp>
5757
#include <srs_protocol_json.hpp>
58+
#include <srs_app_rtc_source.hpp>
5859

5960
// the timeout in srs_utime_t to wait encoder to republish
6061
// if timeout, close the connection.
@@ -959,23 +960,47 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsSource* source)
959960
srs_error_t err = srs_success;
960961

961962
SrsRequest* req = info->req;
962-
963+
964+
// Check whether RTC stream is busy.
965+
#ifdef SRS_RTC
966+
SrsRtcStream *rtc = NULL;
967+
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
968+
bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost);
969+
if (rtc_server_enabled && rtc_enabled && !info->edge) {
970+
if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) {
971+
return srs_error_wrap(err, "create source");
972+
}
973+
974+
if (!rtc->can_publish()) {
975+
return srs_error_new(ERROR_RTC_SOURCE_BUSY, "rtc stream %s busy", req->get_stream_url().c_str());
976+
}
977+
}
978+
#endif
979+
980+
// Check whether RTMP stream is busy.
963981
if (!source->can_publish(info->edge)) {
964982
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str());
965983
}
966-
967-
// when edge, ignore the publish event, directly proxy it.
968-
if (info->edge) {
969-
if ((err = source->on_edge_start_publish()) != srs_success) {
970-
return srs_error_wrap(err, "rtmp: edge start publish");
984+
985+
// Bridge to RTC streaming.
986+
#ifdef SRS_RTC
987+
if (rtc) {
988+
SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc);
989+
if ((err = bridger->initialize(req)) != srs_success) {
990+
srs_freep(bridger);
991+
return srs_error_wrap(err, "bridger init");
971992
}
993+
994+
source->set_bridger(bridger);
995+
}
996+
#endif
997+
998+
// Start publisher now.
999+
if (info->edge) {
1000+
return source->on_edge_start_publish();
9721001
} else {
973-
if ((err = source->on_publish()) != srs_success) {
974-
return srs_error_wrap(err, "rtmp: source publish");
975-
}
1002+
return source->on_publish();
9761003
}
977-
978-
return err;
9791004
}
9801005

9811006
void SrsRtmpConn::release_publish(SrsSource* source)

trunk/src/app/srs_app_source.cpp

+11-29
Original file line numberDiff line numberDiff line change
@@ -1728,34 +1728,13 @@ srs_error_t SrsSourceManager::fetch_or_create(SrsRequest* r, ISrsSourceHandler*
17281728
// should always not exists for create a source.
17291729
srs_assert (pool.find(stream_url) == pool.end());
17301730

1731-
#ifdef SRS_RTC
1732-
bool rtc_server_enabled = _srs_config->get_rtc_server_enabled();
1733-
bool rtc_enabled = _srs_config->get_rtc_enabled(r->vhost);
1734-
1735-
// Get the RTC source and bridger.
1736-
SrsRtcStream* rtc = NULL;
1737-
if (rtc_server_enabled && rtc_enabled) {
1738-
if ((err = _srs_rtc_sources->fetch_or_create(r, &rtc)) != srs_success) {
1739-
err = srs_error_wrap(err, "init rtc %s", r->get_stream_url().c_str());
1740-
goto failed;
1741-
}
1742-
}
1743-
#endif
17441731
srs_trace("new source, stream_url=%s", stream_url.c_str());
17451732

17461733
source = new SrsSource();
17471734
if ((err = source->initialize(r, h)) != srs_success) {
17481735
err = srs_error_wrap(err, "init source %s", r->get_stream_url().c_str());
17491736
goto failed;
17501737
}
1751-
1752-
#ifdef SRS_RTC
1753-
// If rtc enabled, bridge RTMP source to RTC,
1754-
// all RTMP packets will be forwarded to RTC source.
1755-
if (source && rtc) {
1756-
source->bridge_to(rtc->bridger());
1757-
}
1758-
#endif
17591738

17601739
pool[stream_url] = source;
17611740
*pps = source;
@@ -1883,7 +1862,7 @@ SrsSource::SrsSource()
18831862
die_at = 0;
18841863

18851864
handler = NULL;
1886-
bridger = NULL;
1865+
bridger_ = NULL;
18871866

18881867
play_edge = new SrsPlayEdge();
18891868
publish_edge = new SrsPublishEdge();
@@ -1915,6 +1894,7 @@ SrsSource::~SrsSource()
19151894
srs_freep(gop_cache);
19161895

19171896
srs_freep(req);
1897+
srs_freep(bridger_);
19181898
}
19191899

19201900
void SrsSource::dispose()
@@ -1990,9 +1970,10 @@ srs_error_t SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
19901970
return err;
19911971
}
19921972

1993-
void SrsSource::bridge_to(ISrsSourceBridger* v)
1973+
void SrsSource::set_bridger(ISrsSourceBridger* v)
19941974
{
1995-
bridger = v;
1975+
srs_freep(bridger_);
1976+
bridger_ = v;
19961977
}
19971978

19981979
srs_error_t SrsSource::on_reload_vhost_play(string vhost)
@@ -2245,7 +2226,7 @@ srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
22452226
}
22462227

22472228
// For bridger to consume the message.
2248-
if (bridger && (err = bridger->on_audio(msg)) != srs_success) {
2229+
if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {
22492230
return srs_error_wrap(err, "bridger consume audio");
22502231
}
22512232

@@ -2375,7 +2356,7 @@ srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
23752356
}
23762357

23772358
// For bridger to consume the message.
2378-
if (bridger && (err = bridger->on_video(msg)) != srs_success) {
2359+
if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {
23792360
return srs_error_wrap(err, "bridger consume video");
23802361
}
23812362

@@ -2539,7 +2520,7 @@ srs_error_t SrsSource::on_publish()
25392520
return srs_error_wrap(err, "handle publish");
25402521
}
25412522

2542-
if (bridger && (err = bridger->on_publish()) != srs_success) {
2523+
if (bridger_ && (err = bridger_->on_publish()) != srs_success) {
25432524
return srs_error_wrap(err, "bridger publish");
25442525
}
25452526

@@ -2584,8 +2565,9 @@ void SrsSource::on_unpublish()
25842565

25852566
handler->on_unpublish(this, req);
25862567

2587-
if (bridger) {
2588-
bridger->on_unpublish();
2568+
if (bridger_) {
2569+
bridger_->on_unpublish();
2570+
srs_freep(bridger_);
25892571
}
25902572

25912573
// no consumer, stream is die.

trunk/src/app/srs_app_source.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ class SrsSource : public ISrsReloadHandler
534534
// The event handler.
535535
ISrsSourceHandler* handler;
536536
// The source bridger for other source.
537-
ISrsSourceBridger* bridger;
537+
ISrsSourceBridger* bridger_;
538538
// The edge control service
539539
SrsPlayEdge* play_edge;
540540
SrsPublishEdge* publish_edge;
@@ -562,7 +562,7 @@ class SrsSource : public ISrsReloadHandler
562562
// Initialize the hls with handlers.
563563
virtual srs_error_t initialize(SrsRequest* r, ISrsSourceHandler* h);
564564
// Bridge to other source, forward packets to it.
565-
void bridge_to(ISrsSourceBridger* v);
565+
void set_bridger(ISrsSourceBridger* v);
566566
// Interface ISrsReloadHandler
567567
public:
568568
virtual srs_error_t on_reload_vhost_play(std::string vhost);

0 commit comments

Comments
 (0)