diff --git a/endpoint/programs/stream_testing/streams_program_2.cpp b/endpoint/programs/stream_testing/streams_program_2.cpp index 7d763258..fd40d248 100644 --- a/endpoint/programs/stream_testing/streams_program_2.cpp +++ b/endpoint/programs/stream_testing/streams_program_2.cpp @@ -57,15 +57,12 @@ int main(int argc, char** argv) { auto streamlist = streamApi.listStreams(streamRoomId); std::vector streamsId; for(int i = 0; i < streamlist.size(); i++) { - streamsId.push_back(stream::StreamSubscription{streamlist[i].streamId, std::nullopt}); + streamsId.push_back(stream::StreamSubscription{streamlist[i].id, std::nullopt}); } stream::StreamSettings ssettings; streamApi.subscribeToRemoteStreams(streamRoomId, streamsId, ssettings); - std::this_thread::sleep_for(std::chrono::seconds(120)); - streamApi.unpublishStream(streamId_1); - streamApi.unsubscribeFromRemoteStreams(streamRoomId, streamsId); - streamApi.leaveStreamRoom(streamRoomId); + std::this_thread::sleep_for(std::chrono::seconds(60)); std::this_thread::sleep_for(std::chrono::seconds(1)); diff --git a/endpoint/programs/stream_testing/streams_program_3_reciver.cpp b/endpoint/programs/stream_testing/streams_program_3_reciver.cpp index 4e05a59b..a601848e 100644 --- a/endpoint/programs/stream_testing/streams_program_3_reciver.cpp +++ b/endpoint/programs/stream_testing/streams_program_3_reciver.cpp @@ -4,14 +4,20 @@ #include #include #include +#include +#include #include #include #include +#include #include #include +#include #include +#include +#include #include -#include +#include #include using namespace std; @@ -19,45 +25,111 @@ using namespace privmx::endpoint; class RTCVideoRendererImpl { public: - RTCVideoRendererImpl(const std::string& title) : title("PrivMX Stream - " + title) {} - void OnFrame(int64_t w, int64_t h, std::shared_ptr frame, const std::string id) { - if (renderer == NULL) { - window = SDL_CreateWindow(title.c_str(), SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, 768, 432, 0); - renderer = SDL_CreateRenderer(window, -1, SDL_RENDERER_ACCELERATED); - texture = SDL_CreateTexture(renderer, SDL_PIXELFORMAT_RGBA8888, SDL_TEXTUREACCESS_STREAMING, 768, 432); - windowEventsLoop(); - } - uint32_t* pixels = new uint32_t[768 * 432]; - // PRIVMX_DEBUG("RTCVideoRenderer", "OnFrame", "Frame size: "+std::to_string(w) +"-"+std::to_string(h)) - frame->ConvertToRGBA((uint8_t*)pixels, 4, 768, 432); - SDL_UpdateTexture(texture, NULL, pixels, 768 * sizeof(uint32_t)); - SDL_RenderClear(renderer); SDL_RenderCopy(renderer, texture, NULL, NULL); SDL_RenderPresent(renderer); - delete pixels; - + RTCVideoRendererImpl( + const std::string& title, int width, int height + ) : + _title("PrivMX Stream - " + title), _width(width), _height(height), + _window(nullptr), _texture(nullptr), _renderer(nullptr), + _framePixels(std::vector(_width*_height)), _hasNewFrame(false), + _stop(false), _SDLMainThread(std::thread(&RTCVideoRendererImpl::main, this)) + {} + + ~RTCVideoRendererImpl() { + _stop = true; + if (_SDLMainThread.joinable()) _SDLMainThread.join(); } + void OnFrame(int64_t w, int64_t h, std::shared_ptr frame, const std::string& id) + { + std::lock_guard lock(_frameMutex); + frame->ConvertToRGBA( + reinterpret_cast(_framePixels.data()), + 4, _width, _height + ); + + _hasNewFrame = true; + } + private: - void windowEventsLoop() { - eventThreadLoop = std::make_shared([&](){ - while(true) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - SDL_Event event; - while (SDL_PollEvent(&event)) { - if (event.type == SDL_QUIT) exit(0); - if (event.type == SDL_WINDOWEVENT) { - if (event.window.event == SDL_WINDOWEVENT_CLOSE) - { - exit(1); - } - } + void initializeSDL() { + if (SDL_Init(SDL_INIT_VIDEO | SDL_INIT_TIMER) != 0) { + LOG_FATAL("SDL_Init Error: " , SDL_GetError()) + throw "SDL_Error"; + } + _window = SDL_CreateWindow(_title.c_str(), SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, _width, _height, 0); + if (!_window) { + LOG_FATAL("SDL_CreateWindow Error: " , SDL_GetError()) + SDL_Quit(); + throw "SDL_Error"; + } + _renderer = SDL_CreateRenderer(_window, -1, SDL_RENDERER_ACCELERATED); + if (!_renderer) { + LOG_FATAL("SDL_CreateRenderer Error: " , SDL_GetError()) + SDL_DestroyWindow(_window); + SDL_Quit(); + throw "SDL_Error"; + } + _texture = SDL_CreateTexture(_renderer, SDL_PIXELFORMAT_RGBA8888, SDL_TEXTUREACCESS_STREAMING, _width, _height); + if (!_texture) { + LOG_FATAL("SDL_CreateTexture Error: " , SDL_GetError()) + SDL_DestroyRenderer(_renderer); + SDL_DestroyWindow(_window); + SDL_Quit(); + throw "SDL_Error"; + } + } + void main() { + initializeSDL(); + const int frameDelayMs = 1000 / 24; // 24 Frames per second + while (!_stop) { + Uint32 startTicks = SDL_GetTicks64(); + SDL_Event event; + while (SDL_PollEvent(&event)) { + if (event.type == SDL_QUIT || + (event.type == SDL_WINDOWEVENT && + event.window.event == SDL_WINDOWEVENT_CLOSE)) { + _stop = true; + exit(0); } } - }); + { + std::lock_guard lock(_frameMutex); + if (_hasNewFrame) { + SDL_UpdateTexture( + _texture, + nullptr, + _framePixels.data(), + _width * sizeof(uint32_t) + ); + _hasNewFrame = false; + } + } + SDL_RenderClear(_renderer); + SDL_RenderCopy(_renderer, _texture, nullptr, nullptr); + SDL_RenderPresent(_renderer); + + Uint32 elapsed = SDL_GetTicks64() - startTicks; + if (elapsed < (Uint32)frameDelayMs) SDL_Delay(frameDelayMs - elapsed); + } + if (_texture) SDL_DestroyTexture(_texture); + if (_renderer) SDL_DestroyRenderer(_renderer); + if (_window) SDL_DestroyWindow(_window); + SDL_Quit(); } - SDL_Window* window; - SDL_Texture* texture; - SDL_Renderer* renderer = NULL; - std::string title; - std::shared_ptr eventThreadLoop; + + SDL_Window* _window; + SDL_Texture* _texture; + SDL_Renderer* _renderer; + + std::string _title; + int _width; + int _height; + + std::mutex _frameMutex; + std::vector _framePixels; + bool _hasNewFrame; + + std::atomic_bool _stop; + std::thread _SDLMainThread; }; static vector getParamsList(int argc, char* argv[]) { @@ -65,6 +137,35 @@ static vector getParamsList(int argc, char* argv[]) { return args; } +class OnTrackImpl : public stream::OnTrackInterface { +public: + OnTrackImpl() : _renderer(RTCVideoRendererImpl("Remote", 768, 432)) {} + virtual void OnRemoteTrack(stream::Track tack, stream::TrackAction action) override { + if(tack.kind == stream::DataType::AUDIO) { + LOG_INFO("OnRemoteTrack[stream::TrackAction] DataType::AUDIO : ", (action == stream::TrackAction::ADDED ? "ADDED" : "REMOVED")); + if(action == stream::TrackAction::ADDED && !tack.muted) { + tack.updateMute(true); + } + } + if(tack.kind == stream::DataType::VIDEO) { + LOG_INFO("OnRemoteTrack[stream::TrackAction] DataType::VIDEO : ", (action == stream::TrackAction::ADDED ? "ADDED" : "REMOVED")); + } + } + virtual void OnData(std::shared_ptr data) override { + if(data->type == stream::DataType::VIDEO) { + auto videoData = std::dynamic_pointer_cast(data); + // LOG_INFO("VideoData[w-h]: ", videoData->w, "-", videoData->h); + _renderer.OnFrame(videoData->w, videoData->h, videoData->frameData, "test"); + } + if(data->type == stream::DataType::AUDIO) { + auto audioData = std::dynamic_pointer_cast(data); + // LOG_INFO("AudioData[w-h]"); + } + } +private: + RTCVideoRendererImpl _renderer; +}; + int main(int argc, char** argv) { auto params = getParamsList(argc, argv); if(params.size() != 5) { @@ -84,25 +185,69 @@ int main(int argc, char** argv) { ); event::EventApi eventApi = event::EventApi::create(connection); stream::StreamApi streamApi = stream::StreamApi::create(connection, eventApi); - RTCVideoRendererImpl r = RTCVideoRendererImpl("Remote"); - stream::StreamSettings ssettings { - .OnFrame=[&](int64_t w, int64_t h, std::shared_ptr frame, const std::string id) { - r.OnFrame(w, h, frame, id); - } - }; - auto streamlist = streamApi.listStreams(streamRoomId); + core::EventQueue eventQueue = core::EventQueue::getInstance(); + auto onTrack = std::make_shared(); + streamApi.setOnTrackInterface(onTrack); + stream::StreamSettings ssettings {}; + streamApi.subscribeFor({ + streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_UPDATE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_DELETE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_PUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_UNPUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_JOIN, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_LEAVE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + }); std::vector streamsId; - for(int i = 0; i < streamlist.size(); i++) { - std::cout << "streamlist[" << i << "]:" << streamlist[i].streamId << std::endl; - streamsId.push_back(stream::StreamSubscription{streamlist[i].streamId, std::nullopt}); + { + auto streamlist = streamApi.listStreams(streamRoomId); + for(auto stream : streamlist) { + std::cout << "stream:" << stream.id << std::endl; + for(auto track : stream.tracks) { + streamsId.push_back(stream::StreamSubscription{stream.id, track.mid}); + } + break; + } } streamApi.joinStreamRoom(streamRoomId); streamApi.subscribeToRemoteStreams(streamRoomId, streamsId, ssettings); + + auto eventThread = std::thread([&](){ + while (true) { + auto eventHolder = eventQueue.waitEvent(); + if(core::Events::isLibBreakEvent(eventHolder)) return; + if(stream::Events::isStreamUpdatedEvent(eventHolder)) { + auto streamUpdatedEvent = stream::Events::extractStreamUpdatedEvent(eventHolder).data; + auto streamsModified = streamUpdatedEvent.streamsModified; + + std::vector toAddstreamsId; + std::vector toRemovestreamsId; + for(auto stream : streamsModified) { + for(auto track : stream.tracks) { + if(track.after.has_value()) { + if(track.after.value().disabled.has_value()) { + toRemovestreamsId.push_back({stream.streamId, track.after.value().mid}); + } else { + toAddstreamsId.push_back({stream.streamId, track.after.value().mid}); + } + } + + } + } + if(toAddstreamsId.size() > 0 || toRemovestreamsId.size() > 0) { + streamApi.modifyRemoteStreamsSubscriptions(streamRoomId, toAddstreamsId, toRemovestreamsId, ssettings); + } + } + } + }); - while (true) {std::this_thread::sleep_for(std::chrono::seconds(5));} + while (true) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } streamApi.unsubscribeFromRemoteStreams(streamRoomId, streamsId); std::this_thread::sleep_for(std::chrono::seconds(5)); + eventQueue.emitBreakEvent(); + if(eventThread.joinable()) eventThread.join(); connection.disconnect(); } catch (const core::Exception& e) { diff --git a/endpoint/programs/stream_testing/streams_program_3_sender.cpp b/endpoint/programs/stream_testing/streams_program_3_sender.cpp index a11393be..24f51590 100644 --- a/endpoint/programs/stream_testing/streams_program_3_sender.cpp +++ b/endpoint/programs/stream_testing/streams_program_3_sender.cpp @@ -7,59 +7,17 @@ #include #include #include +#include #include #include +#include #include #include -#include -#include +#include using namespace std; using namespace privmx::endpoint; - -class RTCVideoRendererImpl { -public: - RTCVideoRendererImpl(const std::string& title) : title("PrivMX Stream - " + title) {} - void OnFrame(int64_t w, int64_t h, std::shared_ptr frame, const std::string id) { - if (renderer == NULL) { - window = SDL_CreateWindow(title.c_str(), SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, 768, 432, 0); - renderer = SDL_CreateRenderer(window, -1, SDL_RENDERER_ACCELERATED); - texture = SDL_CreateTexture(renderer, SDL_PIXELFORMAT_RGBA8888, SDL_TEXTUREACCESS_STREAMING, 768, 432); - windowEventsLoop(); - } - uint32_t* pixels = new uint32_t[768 * 432]; - // std::cout << w << " - " << h << " frame Size" << std::endl; - frame->ConvertToRGBA((uint8_t*)pixels, 4, 768, 432); - SDL_UpdateTexture(texture, NULL, pixels, 768 * sizeof(uint32_t)); - SDL_RenderClear(renderer); SDL_RenderCopy(renderer, texture, NULL, NULL); SDL_RenderPresent(renderer); - delete pixels; - } -private: - void windowEventsLoop() { - eventThreadLoop = std::make_shared([&](){ - while(true) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - SDL_Event event; - while (SDL_PollEvent(&event)) { - if (event.type == SDL_QUIT) exit(0); - if (event.type == SDL_WINDOWEVENT) { - if (event.window.event == SDL_WINDOWEVENT_CLOSE) - { - exit(1); - } - } - } - } - }); - } - SDL_Window* window; - SDL_Texture* texture; - SDL_Renderer* renderer = NULL; - std::string title; - std::shared_ptr eventThreadLoop; -}; - static vector getParamsList(int argc, char* argv[]) { vector args(argv + 1, argv + argc); return args; @@ -76,15 +34,36 @@ int main(int argc, char** argv) { std::string bridgeUrl = {params[2].begin(), params[2].end()}; std::string contextId = {params[3].begin(), params[3].end()}; std::string streamRoomId = {params[4].begin(), params[4].end()}; + atomic_bool stop = false; + core::EventQueue eventQueue = core::EventQueue::getInstance(); + std::thread eventThread; try { core::Connection connection = core::Connection::connect( privKey, solutionId, bridgeUrl ); + eventThread = std::thread([&](){ + while (!stop) { + auto eventHolder = eventQueue.waitEvent(); + } + }); event::EventApi eventApi = event::EventApi::create(connection); stream::StreamApi streamApi = stream::StreamApi::create(connection, eventApi); + + streamApi.subscribeFor({ + streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_UPDATE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_DELETE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_PUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_UNPUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_JOIN, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + streamApi.buildSubscriptionQuery(stream::EventType::STREAM_LEAVE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId), + }); + streamApi.joinStreamRoom(streamRoomId); + // for(int j = 0; j < 5; j++) { + // streamApi.joinStreamRoom(streamRoomId); + auto streamHandle = streamApi.createStream(streamRoomId); auto mediaDevices = streamApi.getMediaDevices(); for(const auto& mediaDevice: mediaDevices) { @@ -99,24 +78,79 @@ int main(int argc, char** argv) { break; } } + // for(const auto& mediaDevice: mediaDevices) { + // if(mediaDevice.type == stream::DeviceType::Desktop) { + // streamApi.addTrack(streamHandle, mediaDevice); + // break; + // } + // } streamApi.publishStream(streamHandle); - while (true) {std::this_thread::sleep_for(std::chrono::seconds(5));} + int i = 20; + while (i--) { + // std::this_thread::sleep_for(std::chrono::seconds(10)); + // std::cout << "-------------------------------------------------------------------------------------------------------" << std::endl; + // for(const auto& mediaDevice: mediaDevices) { + // if(mediaDevice.type == stream::DeviceType::Video) { + // streamApi.removeTrack(streamHandle, mediaDevice); + // break; + // } + // } + // std::this_thread::sleep_for(std::chrono::seconds(5)); + // std::cout << "----------------------------------------------remove track---------------------------------------------" << std::endl; + // core::VarSerializer serializer = core::VarSerializer(core::VarSerializer::Options{false, core::VarSerializer::Options::STD_STRING}); + // streamApi.updateStream(streamHandle); + // std::cout << "----------------------------------------------streams list---------------------------------------------" << std::endl; + // auto streamsList = streamApi.listStreams(streamRoomId); + // std::cout << privmx::utils::Utils::stringifyVar(serializer.serialize(streamsList), true) << std::endl; + // std::cout << "-------------------------------------------------------------------------------------------------------" << std::endl; + // auto mediaDevices = streamApi.getMediaDevices(); + // for(const auto& mediaDevice: mediaDevices) { + // if(mediaDevice.type == stream::DeviceType::Video) { + // streamApi.addTrack(streamHandle, mediaDevice); + // break; + // } + // } + // std::this_thread::sleep_for(std::chrono::seconds(5)); + // std::cout << "----------------------------------------------add track------------------------------------------------" << std::endl; + + // streamApi.updateStream(streamHandle); + }; + std::this_thread::sleep_for(std::chrono::seconds(600)); streamApi.unpublishStream(streamHandle); + // streamApi.leaveStreamRoom(streamRoomId); + + std::this_thread::sleep_for(std::chrono::seconds(2)); + // } + + + + streamApi.leaveStreamRoom(streamRoomId); connection.disconnect(); + stop = true; + eventQueue.emitBreakEvent(); + eventThread.join(); std::this_thread::sleep_for(std::chrono::seconds(5)); } catch (const core::Exception& e) { cerr << e.getFull() << endl; + stop = true; + if(eventThread.joinable()) {eventQueue.emitBreakEvent(); eventThread.join();} } catch (const privmx::utils::PrivmxException& e) { cerr << e.what() << endl; cerr << e.getData() << endl; cerr << e.getCode() << endl; + stop = true; + if(eventThread.joinable()) {eventQueue.emitBreakEvent(); eventThread.join();} } catch (const exception& e) { cerr << e.what() << endl; + stop = true; + if(eventThread.joinable()) {eventQueue.emitBreakEvent(); eventThread.join();} } catch (...) { cerr << "Error" << endl; + stop = true; + if(eventThread.joinable()) {eventQueue.emitBreakEvent(); eventThread.join();} } return 0; diff --git a/endpoint/stream/stream/include/privmx/endpoint/stream/StreamApiLowImpl.hpp b/endpoint/stream/stream/include/privmx/endpoint/stream/StreamApiLowImpl.hpp index d93d0336..072cd406 100644 --- a/endpoint/stream/stream/include/privmx/endpoint/stream/StreamApiLowImpl.hpp +++ b/endpoint/stream/stream/include/privmx/endpoint/stream/StreamApiLowImpl.hpp @@ -104,8 +104,8 @@ class StreamApiLowImpl : public privmx::utils::ManualManagedClass streamHandle; }; struct StreamRoomData { - StreamRoomData(std::shared_ptr _streamKeyManager, const std::string _streamRoomId, std::shared_ptr _webRtc): - streamKeyManager(_streamKeyManager), streamRoomId(_streamRoomId), webRtc(_webRtc) + StreamRoomData(std::shared_ptr _streamKeyManager, const std::string _streamRoomId, std::shared_ptr _webRtc, const std::vector& _subscriptionsIds): + streamKeyManager(_streamKeyManager), streamRoomId(_streamRoomId), webRtc(_webRtc), subscriptionsIds(_subscriptionsIds) { keyUpdateCallbackId = streamKeyManager->addKeyUpdateCallback([_webRtc, _streamRoomId](const std::vector keys) { _webRtc->updateKeys(_streamRoomId, keys); @@ -117,6 +117,7 @@ class StreamApiLowImpl : public privmx::utils::ManualManagedClass webRtc; int64_t keyUpdateCallbackId; + std::vector subscriptionsIds; }; // if streamMap is empty after leave, unpublish StreamRoomData should, be removed. void onNotificationEvent(const std::string& type, const core::NotificationEvent& notification); diff --git a/endpoint/stream/stream/include/privmx/endpoint/stream/SubscriberImpl.hpp b/endpoint/stream/stream/include/privmx/endpoint/stream/SubscriberImpl.hpp index 984fc096..7fa70cdd 100644 --- a/endpoint/stream/stream/include/privmx/endpoint/stream/SubscriberImpl.hpp +++ b/endpoint/stream/stream/include/privmx/endpoint/stream/SubscriberImpl.hpp @@ -25,7 +25,7 @@ class SubscriberImpl : public privmx::endpoint::core::Subscriber SubscriberImpl(privmx::privfs::RpcGateway::Ptr gateway) : Subscriber(gateway) {} static std::string buildQuery(EventType eventType, EventSelectorType selectorType, const std::string& selectorId); - static std::string getInternalEventsSubscriptionQuery(); + static std::string getInternalEventsSubscriptionQuery(const std::optional& streamRoomId = std::nullopt); private: virtual privmx::utils::List transform(const std::vector& subscriptionQueries); virtual void assertQuery(const std::vector& subscriptionQueries); diff --git a/endpoint/stream/stream/include_pub/privmx/endpoint/stream/Types.hpp b/endpoint/stream/stream/include_pub/privmx/endpoint/stream/Types.hpp index c5acd6fd..2ea45f49 100644 --- a/endpoint/stream/stream/include_pub/privmx/endpoint/stream/Types.hpp +++ b/endpoint/stream/stream/include_pub/privmx/endpoint/stream/Types.hpp @@ -92,16 +92,6 @@ class Frame { virtual int ConvertToRGBA(uint8_t* dst_argb, int dst_stride_argb, int dest_width, int dest_height) = 0; }; -struct StreamSettings { - Settings settings; - std::optional> OnVideo; - std::optional, const std::string&)>> OnFrame; - std::optional> OnVideoRemove; - bool dropCorruptedFrames = true; -}; - - - enum DeviceType { Audio = 0, Video = 1, @@ -112,6 +102,11 @@ struct MediaDevice { std::string name; std::string id; DeviceType type; + // bool enabled; +}; + +struct MediaTrack { + std::function setEnabled; }; enum EventType: int64_t { diff --git a/endpoint/stream/stream/src/StreamApiLowImpl.cpp b/endpoint/stream/stream/src/StreamApiLowImpl.cpp index 2a2e37de..36d98520 100644 --- a/endpoint/stream/stream/src/StreamApiLowImpl.cpp +++ b/endpoint/stream/stream/src/StreamApiLowImpl.cpp @@ -63,7 +63,7 @@ StreamApiLowImpl::StreamApiLowImpl( _notificationListenerId = _eventMiddleware->addNotificationEventListener(std::bind(&StreamApiLowImpl::onNotificationEvent, this, std::placeholders::_1, std::placeholders::_2)); _connectedListenerId = _eventMiddleware->addConnectedEventListener(std::bind(&StreamApiLowImpl::processConnectedEvent, this)); _disconnectedListenerId = _eventMiddleware->addDisconnectedEventListener(std::bind(&StreamApiLowImpl::processDisconnectedEvent, this)); - + // auto internalSubscriptionQuery {_subscriber.getInternalEventsSubscriptionQuery()}; auto result = _subscriber.subscribeFor({internalSubscriptionQuery}, true); _eventMiddleware->notificationEventListenerAddSubscriptionIds(_notificationListenerId, result); @@ -259,10 +259,17 @@ std::shared_ptr Stre auto model = privmx::utils::TypedObjectFactory::createNewObject(); model.id(streamRoomId); auto streamRoom = _serverApi->streamRoomGet(model).streamRoom(); + + // setup event listener + auto internalSubscriptionQuery {_subscriber.getInternalEventsSubscriptionQuery(streamRoomId)}; + std::vector subscriptionsIds = _subscriber.subscribeFor({internalSubscriptionQuery}, true); + _eventMiddleware->notificationEventListenerAddSubscriptionIds(_notificationListenerId, subscriptionsIds); + std::shared_ptr streamRoomData = std::make_shared( std::make_shared(_eventApi, _keyProvider, _serverApi, _userPrivKey, streamRoomId, streamRoom.contextId(), _notificationListenerId), streamRoomId, - webRtc + webRtc, + subscriptionsIds ); _streamRoomMap.set( streamRoomId, @@ -293,30 +300,34 @@ void StreamApiLowImpl::joinStreamRoom(const std::string& streamRoomId, std::shar _serverApi->streamRoomJoin(model); } void StreamApiLowImpl::leaveStreamRoom(const std::string& streamRoomId) { - auto model = privmx::utils::TypedObjectFactory::createNewObject(); - model.streamRoomId(streamRoomId); - _serverApi->streamRoomLeave(model); - auto room = getStreamRoomData(streamRoomId); + // close event listener + auto internalSubscriptionQuery {_subscriber.getInternalEventsSubscriptionQuery(room->streamRoomId)}; + _subscriber.unsubscribeFrom(room->subscriptionsIds); + _eventMiddleware->notificationEventListenerRemoveSubscriptionIds(_notificationListenerId, room->subscriptionsIds); + + LOG_DEBUG("StreamApiLowImpl:leaveStreamRoom", "gently close of streams"); if(room->publisherStream) { - //gently close publisherStream if(room->publisherStream->streamHandle.has_value()) { _streamHandleToRoomId.erase(room->publisherStream->streamHandle.value()); } } if(room->subscriberStream) { - //gently close subscriberStream if(room->subscriberStream->streamHandle.has_value()) { _streamHandleToRoomId.erase(room->subscriberStream->streamHandle.value()); } } - room->streamKeyManager->removeKeyUpdateCallback(room->keyUpdateCallbackId); //kill all webRTC pearConnections room->webRtc->close(room->streamRoomId); //stop StreamKeyManager + room->streamKeyManager->removeKeyUpdateCallback(room->keyUpdateCallbackId); room->streamKeyManager.reset(); // Final clenup _streamRoomMap.erase(streamRoomId); + // send laveRequest + auto model = privmx::utils::TypedObjectFactory::createNewObject(); + model.streamRoomId(streamRoomId); + _serverApi->streamRoomLeave(model); } void StreamApiLowImpl::enableStreamRoomRecording(const std::string& streamRoomId) { @@ -359,6 +370,7 @@ StreamPublishResult StreamApiLowImpl::publishStream(const StreamHandle& streamHa auto model = utils::TypedObjectFactory::createNewObject(); model.streamRoomId(room->streamRoomId); model.offer(sessionDescription); + std::cout << privmx::utils::Utils::stringifyVar(model) << std::endl; auto result = _serverApi->streamPublish(model); streamData->sessionId = result.sessionId(); // update/set sessionId in webrtc (for Janus - trickle) @@ -432,7 +444,7 @@ void StreamApiLowImpl::unpublishStream(const StreamHandle& streamHandle) { model.sessionId(streamData->sessionId.value()); _serverApi->streamUnpublish(model); } - room->webRtc->close(room->streamRoomId); + // room->webRtc->close(room->streamRoomId); _streamHandleToRoomId.erase(streamHandle); room->publisherStream.reset(); } @@ -834,7 +846,6 @@ std::vector StreamApiLowImpl::decryptAndConvertStreamRoomsDataToStre std::map duplication_check; for (auto streamRoom: streamRooms) { try { - auto tmp = decryptAndConvertStreamRoomDataToStreamRoom( streamRoom, streamRoom.data().get(streamRoom.data().size()-1), @@ -1065,24 +1076,25 @@ void StreamApiLowImpl::sendStreamKeyRequest(std::shared_ptrset("$in", usersIds); query->set("#userId", queryId); + if(usersIds->size() != 0) { + core::PagingList userInfoList = _connection->listContextUsers( + streamRoom.contextId(), + core::PagingQuery{ + .skip = 0, + .limit = static_cast(usersIds->size()), + .sortOrder = "desc", + .lastId = std::nullopt, + .sortBy = std::nullopt, + .queryAsJson = privmx::utils::Utils::stringify(query) + } + ); - core::PagingList userInfoList = _connection->listContextUsers( - streamRoom.contextId(), - core::PagingQuery{ - .skip = 0, - .limit = static_cast(usersIds->size()), - .sortOrder = "desc", - .lastId = std::nullopt, - .sortBy = std::nullopt, - .queryAsJson = privmx::utils::Utils::stringify(query) + + std::vector toSend; + for(auto userInfo: userInfoList.readItems) { + LOG_DEBUG("StreamApiLowImpl::sendStreamKeyRequest Request Send: " + userInfo.user.userId) + toSend.push_back(userInfo.user); } - ); - - - std::vector toSend; - for(auto userInfo: userInfoList.readItems) { - LOG_DEBUG("StreamApiLowImpl::sendStreamKeyRequest Request Send: " + userInfo.user.userId) - toSend.push_back(userInfo.user); + room->streamKeyManager->requestKey(toSend); } - room->streamKeyManager->requestKey(toSend); } diff --git a/endpoint/stream/stream/src/StreamKeyManager.cpp b/endpoint/stream/stream/src/StreamKeyManager.cpp index 37c50abe..83daa93b 100644 --- a/endpoint/stream/stream/src/StreamKeyManager.cpp +++ b/endpoint/stream/stream/src/StreamKeyManager.cpp @@ -16,9 +16,9 @@ limitations under the License. #include #include -#define UPDATE_INTERVAL 1000*1 +#define UPDATE_INTERVAL 1000*5 #define MAX_UPDATE_TIMEOUT 1000*5 -#define MAX_STD_KEY_TTL 1000*15+(MAX_UPDATE_TIMEOUT*2)+UPDATE_INTERVAL +#define MAX_STD_KEY_TTL 1000*120+(MAX_UPDATE_TIMEOUT*2)+UPDATE_INTERVAL using namespace privmx::endpoint::stream; @@ -31,7 +31,7 @@ StreamKeyManager::StreamKeyManager( const std::string& contextId, int notificationListenerId ) : _eventApi(eventApi), _keyProvider(keyProvider), _serverApi(serverApi), _userPrivKey(userPrivKey), _streamRoomId(streamRoomId), _contextId(contextId), _notificationListenerId(notificationListenerId) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Constructed") + LOG_TRACE("StreamKeyManager::Constructed") _userPubKey = _userPrivKey.getPublicKey(); // generate curren key auto currentKey = _keyProvider->generateKey(); @@ -59,16 +59,16 @@ StreamKeyManager::StreamKeyManager( } } if(!key || (key->creation_time + key->TTL - std::chrono::milliseconds(MAX_UPDATE_TIMEOUT+UPDATE_INTERVAL) < std::chrono::system_clock::now())) { - updateKey(); + if(!token->isCancelled()) updateKey(); } } } catch (const core::Exception& e) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "_keyUpdater core::Exception: " + e.getFull()); + LOG_ERROR("StreamKeyManager::_keyUpdater core::Exception: " + e.getFull()); e.rethrow(); } catch (const privmx::utils::OperationCancelledException& e) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "_keyUpdater stop"); + LOG_TRACE("StreamKeyManager::_keyUpdater stop"); } catch (const std::exception& e) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "_keyUpdater std::exception: " + std::string(e.what())); + LOG_ERROR("StreamKeyManager::_keyUpdater std::exception: " + std::string(e.what())); throw e; } }, _cancellationToken); @@ -86,7 +86,7 @@ StreamKeyManager::~StreamKeyManager() { _eventApi->unsubscribeFromInternal(_subscriptionIds, _notificationListenerId); } if(_keyUpdater.joinable()) _keyUpdater.join(); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Successfully Deconstructed : " + _streamRoomId); + LOG_TRACE("StreamKeyManager::Successfully Deconstructed : " + _streamRoomId); } std::vector StreamKeyManager::getCurrentWebRtcKeys() { return _currentWebRtcKeys; @@ -108,18 +108,18 @@ void StreamKeyManager::removeKeyUpdateCallback(int64_t keyUpdateCallbackId) { } void StreamKeyManager::respondToEvent(dynamic::StreamKeyManagementEvent event, const std::string& userId, const std::string& userPubKey) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToEvent data: " + privmx::utils::Utils::stringifyVar(event)); + LOG_TRACE("StreamKeyManager::respondToEvent data: " + privmx::utils::Utils::stringifyVar(event)); if(event.subtype() == "RequestKeyEvent") { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Responding RequestKeyEvent"); + LOG_TRACE("StreamKeyManager::Responding RequestKeyEvent"); respondToRequestKey(userId, userPubKey); } else if(event.subtype() == "RequestKeyRespondEvent") { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Responding RequestKeyRespondEvent"); + LOG_TRACE("StreamKeyManager::Responding RequestKeyRespondEvent"); setRequestKeyResult(privmx::utils::TypedObjectFactory::createObjectFromVar(event)); } else if(event.subtype() == "UpdateKeyEvent") { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Responding UpdateKeyEvent"); + LOG_TRACE("StreamKeyManager::Responding UpdateKeyEvent"); respondToUpdateRequest(privmx::utils::TypedObjectFactory::createObjectFromVar(event), userId, userPubKey); } else if(event.subtype() == "UpdateKeyACKEvent") { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Responding UpdateKeyACKEvent"); + LOG_TRACE("StreamKeyManager::Responding UpdateKeyACKEvent"); respondUpdateKeyConfirmation(privmx::utils::TypedObjectFactory::createObjectFromVar(event), userPubKey); } } @@ -134,9 +134,7 @@ void StreamKeyManager::requestKey(const std::vector(); streamEncKey.keyId(currentKey->key.id); streamEncKey.key(privmx::utils::Base64::from(currentKey->key.key)); @@ -150,19 +148,19 @@ void StreamKeyManager::respondToRequestKey(const std::string& userId, const std: auto userWithPubKey = privmx::endpoint::core::UserWithPubKey{.userId=userId, .pubKey=userPubKey}; sendStreamKeyManagementEvent(respond, {userWithPubKey}); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToRequestKey sendStreamKeyManagementEvent Done"); + LOG_TRACE("StreamKeyManager::respondToRequestKey sendStreamKeyManagementEvent Done"); bool hasUser = false; std::unique_lock lock(_connectedUsersMutex); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToRequestKey _connectedUsers.size():" + std::to_string(_connectedUsers.size())); + LOG_TRACE("StreamKeyManager::respondToRequestKey _connectedUsers.size():" + std::to_string(_connectedUsers.size())); for(auto& connectedUser : _connectedUsers) { if(userWithPubKey.pubKey == connectedUser.pubKey && userWithPubKey.userId == connectedUser.userId) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToRequestKey _connectedUsers user in list: " + userWithPubKey.userId); + LOG_TRACE("StreamKeyManager::respondToRequestKey _connectedUsers user in list: " + userWithPubKey.userId); hasUser = true; break; } } if(!hasUser) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToRequestKey _connectedUsers user added: " + userWithPubKey.userId); + LOG_TRACE("StreamKeyManager::respondToRequestKey _connectedUsers user added: " + userWithPubKey.userId); _connectedUsers.push_back(userWithPubKey); } } @@ -184,7 +182,7 @@ void StreamKeyManager::setRequestKeyResult(dynamic::RequestKeyRespondEvent resul ) ); } - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "setRequestKeyResult updateWebRtcKeyStore"); + LOG_TRACE("StreamKeyManager::setRequestKeyResult updateWebRtcKeyStore"); updateWebRtcKeyStore(); } @@ -213,7 +211,7 @@ void StreamKeyManager::updateKey() { for(auto user : _connectedUsers) { users += " " + user.userId; } - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "Update-Key to users: " + users); + LOG_TRACE("StreamKeyManager::Update-Key to users: " + users); for(auto user : _connectedUsers) { _userUpdateKeyConfirmationStatus.insert_or_assign(user.pubKey, false); } @@ -233,9 +231,9 @@ void StreamKeyManager::updateKey() { std::unique_lock lock(_updateKeyMutex); if(wait) { - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "updateKey _updateKeyCV.wait_for"); + LOG_TRACE("StreamKeyManager::updateKey _updateKeyCV.wait_for"); auto status = _updateKeyCV.wait_for(lock, std::chrono::milliseconds(MAX_UPDATE_TIMEOUT)); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", status == std::cv_status::timeout ? "updateKey _updateKeyCV.wait_for: timeout" : "updateKey _updateKeyCV.wait_for: ack form everyone"); + LOG_TRACE("StreamKeyManager", status == std::cv_status::timeout ? "updateKey _updateKeyCV.wait_for: timeout" : "updateKey _updateKeyCV.wait_for: ack form everyone"); } if(!_cancellationToken->isCancelled()) { { @@ -243,7 +241,7 @@ void StreamKeyManager::updateKey() { _keysStrage.insert_or_assign(_keyForUpdate->key.id, _keyForUpdate); _currentKeyId = _keyForUpdate->key.id; } - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "updateKey updateWebRtcKeyStore"); + LOG_TRACE("StreamKeyManager::updateKey updateWebRtcKeyStore"); updateWebRtcKeyStore(); } } @@ -265,7 +263,7 @@ void StreamKeyManager::respondToUpdateRequest(dynamic::UpdateKeyEvent request, c ) ); } - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "respondToUpdateRequest updateWebRtcKeyStore"); + LOG_TRACE("StreamKeyManager::respondToUpdateRequest updateWebRtcKeyStore"); updateWebRtcKeyStore(); // prepare ack data dynamic::UpdateKeyACKEvent ack = privmx::utils::TypedObjectFactory::createNewObject(); @@ -322,10 +320,8 @@ void StreamKeyManager::sendStreamKeyManagementEvent(dynamic::StreamCustomEventDa data.streamRoomId(_streamRoomId); event::InternalContextEventDataV1 event = {.type="StreamKeyManagementEvent", .data= privmx::endpoint::core::Buffer::from(utils::Utils::stringifyVar(data))}; - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "sendStreamKeyManagementEvent data: " + privmx::utils::Utils::stringifyVar(data)); + LOG_TRACE("StreamKeyManager::sendStreamKeyManagementEvent data: " + privmx::utils::Utils::stringifyVar(data)); _eventApi->emitEventInternal(_contextId, event, users); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "sendStreamKeyManagementEvent data send"); - } void StreamKeyManager::updateWebRtcKeyStore() { @@ -333,7 +329,7 @@ void StreamKeyManager::updateWebRtcKeyStore() { std::vector webRtcKeys; { std::shared_lock lock(_keysStrageMutex); - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "updateWebRtcKeyStore key_list_size : " + std::to_string(_keysStrage.size())); + LOG_TRACE("StreamKeyManager::updateWebRtcKeyStore key_list_size : " + std::to_string(_keysStrage.size())); for(auto& key: _keysStrage) { privmx::endpoint::stream::Key webRtcKey { .keyId = key.second->key.id, @@ -346,7 +342,7 @@ void StreamKeyManager::updateWebRtcKeyStore() { _currentWebRtcKeys = webRtcKeys; if(!disableKeyUpdateForEncryptors) { std::shared_lock lock(_webRtcKeyUpdateCallbacksMutex); // <- T1 - PRIVMX_DEBUG("STREAMS", "KEY-MANAGER", "updateWebRtcKeyStore updating list int webrtc") + LOG_TRACE("StreamKeyManager::updateWebRtcKeyStore updating list int webrtc") for(auto& webRtcKeyUpdateCallback: _webRtcKeyUpdateCallbacks) { // <- T2 webRtcKeyUpdateCallback.second(_currentWebRtcKeys); }; diff --git a/endpoint/stream/stream/src/StreamVarDeserializer.cpp b/endpoint/stream/stream/src/StreamVarDeserializer.cpp index b7efa19d..9b036a60 100644 --- a/endpoint/stream/stream/src/StreamVarDeserializer.cpp +++ b/endpoint/stream/stream/src/StreamVarDeserializer.cpp @@ -98,7 +98,7 @@ stream::StreamTrackInfo VarDeserializer::deserialize(co .mindex = deserialize(obj->get("mindex"), name + ".mindex"), .mid = deserialize(obj->get("mid"), name + ".mid"), .disabled = {obj->has("disabled") ? std::make_optional(deserialize(obj->get("disabled"), name + ".disabled")) : std::nullopt}, - .codec = {obj->has("codec") ? std::make_optional(deserialize(obj->get("codec"), name + ".codec")): std::nullopt}, + .codec = {obj->has("codec") ? std::make_optional(deserialize(obj->get("codec"), name + ".codec")) : std::nullopt}, .description = {obj->has("description") ? std::make_optional(deserialize(obj->get("description"), name + ".description")) : std::nullopt}, .moderated = {obj->has("moderated") ? std::make_optional(deserialize(obj->get("moderated"), name + ".moderated")) : std::nullopt}, .simulcast = {obj->has("simulcast") ? std::make_optional(deserialize(obj->get("simulcast"), name + ".simulcast")) : std::nullopt}, @@ -204,7 +204,6 @@ template<> stream::StreamPublishedEventData VarDeserializer::deserialize(const Poco::Dynamic::Var& val, const std::string& name) { TypeValidator::validateObject(val, name); Poco::JSON::Object::Ptr obj = val.extract(); - return { .streamRoomId = deserialize(obj->get("streamRoomId"), name + ".streamRoomId"), .stream = deserialize(obj->get("stream"), name + ".stream"), diff --git a/endpoint/stream/stream/src/SubscriberImpl.cpp b/endpoint/stream/stream/src/SubscriberImpl.cpp index c965f196..f4e6ec2b 100644 --- a/endpoint/stream/stream/src/SubscriberImpl.cpp +++ b/endpoint/stream/stream/src/SubscriberImpl.cpp @@ -62,8 +62,8 @@ std::string SubscriberImpl::getSelector(EventSelectorType selectorType, const st return "|" + _selectorTypeNames.at(selectorType) + "=" + selectorId; } -std::string SubscriberImpl::getInternalEventsSubscriptionQuery() { - return std::string(_moduleName) + "/internal"; +std::string SubscriberImpl::getInternalEventsSubscriptionQuery(const std::optional& streamRoomId) { + return std::string(_moduleName) + "/internal" + (streamRoomId.has_value() ? getSelector(EventSelectorType::STREAMROOM_ID, streamRoomId.value()) : ""); } std::string SubscriberImpl::buildQuery(EventType eventType, EventSelectorType selectorType, const std::string& selectorId) { diff --git a/endpoint/stream/webrtc/include/privmx/endpoint/stream/PeerConnectionManager.hpp b/endpoint/stream/webrtc/include/privmx/endpoint/stream/PeerConnectionManager.hpp index 8139157e..419596a2 100644 --- a/endpoint/stream/webrtc/include/privmx/endpoint/stream/PeerConnectionManager.hpp +++ b/endpoint/stream/webrtc/include/privmx/endpoint/stream/PeerConnectionManager.hpp @@ -45,6 +45,7 @@ struct VideoTrackInfo { struct PeerConnection { libwebrtc::scoped_refptr pc; std::shared_ptr observer; + libwebrtc::scoped_refptr mediaStream; std::map audioTracks; std::map videoTracks; std::shared_mutex trackMutex; @@ -52,9 +53,17 @@ struct PeerConnection { }; struct JanusConnection { - JanusConnection(std::shared_ptr peerConnection_, int64_t sessionId_, bool hasSubscriptions_) : - peerConnection(peerConnection_), sessionId(sessionId_), hasSubscriptions(hasSubscriptions_) {} + JanusConnection( + std::shared_ptr _peerConnection, + int64_t _sessionId, + bool _hasSubscriptions + ) : + peerConnection(_peerConnection), + sessionId(_sessionId), + hasSubscriptions(_hasSubscriptions) + {} std::shared_ptr peerConnection; + libwebrtc::scoped_refptr mediaStream; int64_t sessionId; bool hasSubscriptions; }; @@ -68,6 +77,8 @@ class PeerConnectionManager { void updateSessionForConnection(const std::string& streamRoomId, ConnectionType connectionType, const int64_t sessionId); bool hasConnection(const std::string& streamRoomId, ConnectionType connectionType); std::shared_ptr getConnectionWithSession(const std::string& streamRoomId, ConnectionType connectionType); + void closeConnection(const std::string& streamRoomId, ConnectionType connectionType); + void closeSession(const std::string& streamRoomId); private: std::function(const std::string&)> _createPeerConnection; std::function _onTrickle; diff --git a/endpoint/stream/webrtc/include/privmx/endpoint/stream/PmxPeerConnectionObserver.hpp b/endpoint/stream/webrtc/include/privmx/endpoint/stream/PmxPeerConnectionObserver.hpp index 19cae538..0e146f72 100644 --- a/endpoint/stream/webrtc/include/privmx/endpoint/stream/PmxPeerConnectionObserver.hpp +++ b/endpoint/stream/webrtc/include/privmx/endpoint/stream/PmxPeerConnectionObserver.hpp @@ -17,7 +17,9 @@ limitations under the License. #include #include #include "privmx/endpoint/stream/webrtc/Types.hpp" +#include "privmx/endpoint/stream/webrtc/OnTrackInterface.hpp" #include +#include #include namespace privmx { @@ -36,15 +38,64 @@ class FrameImpl : public Frame { template class RTCVideoRendererImpl : public libwebrtc::RTCVideoRenderer { public: - inline RTCVideoRendererImpl(std::function, const std::string&)> onFrameCallback, const std::string& id) : _onFrameCallback(onFrameCallback), _id(id) {} + inline RTCVideoRendererImpl(std::shared_ptr onTrackInterface, const std::vector& streamIds, libwebrtc::scoped_refptr track) + : _onTrackInterface(onTrackInterface), _streamIds(streamIds), _track(track) { + LOG_TRACE("RTCVideoRendererImpl created") + } + inline RTCVideoRendererImpl(const std::vector& streamIds, libwebrtc::scoped_refptr track) + : _onTrackInterface(nullptr), _streamIds(streamIds), _track(track) { + LOG_TRACE("RTCVideoRendererImpl created") + } + inline ~RTCVideoRendererImpl() { + LOG_TRACE("RTCVideoRendererImpl destroyed") + } + void updateOnTrackInterface(std::shared_ptr onTrackInterface) { + std::unique_lock lock(m); + _onTrackInterface = onTrackInterface; + } virtual void OnFrame(VideoFrameT frame) override { + if(_onTrackInterface) { + std::unique_lock lock(m); + std::shared_ptr videoData = std::make_unique(DataType::VIDEO, _streamIds, _track->id().std_string(), frame->width(), frame->height(), std::make_shared(frame)); + _onTrackInterface->OnData(videoData); + } + } +private: + std::mutex m; + std::shared_ptr _onTrackInterface; + std::vector _streamIds; + libwebrtc::scoped_refptr _track; +}; + +class AudioTrackSinkImpl : public libwebrtc::AudioTrackSink { +public: + inline AudioTrackSinkImpl(std::shared_ptr onTrackInterface, const std::vector& streamIds, libwebrtc::scoped_refptr track) + : _onTrackInterface(onTrackInterface), _streamIds(streamIds), _track(track) { + LOG_TRACE("AudioTrackSinkImpl created") + } + inline AudioTrackSinkImpl(const std::vector& streamIds, libwebrtc::scoped_refptr track) + : _onTrackInterface(nullptr), _streamIds(streamIds), _track(track) { + LOG_TRACE("AudioTrackSinkImpl created") + } + inline ~AudioTrackSinkImpl() { + LOG_TRACE("AudioTrackSinkImpl destroyed") + } + void updateOnTrackInterface(std::shared_ptr onTrackInterface) { std::unique_lock lock(m); - _onFrameCallback(frame->width(), frame->height(), std::make_shared(frame), _id); + _onTrackInterface = onTrackInterface; + } + virtual void OnData(const void* audio_data, int bits_per_sample, int sample_rate, size_t number_of_channels, size_t number_of_frames) override { + if(_onTrackInterface) { + std::unique_lock lock(m); + std::shared_ptr audioData = std::make_unique(DataType::AUDIO, _streamIds, _track->id().std_string(), audio_data, bits_per_sample, sample_rate, number_of_channels, number_of_frames); + _onTrackInterface->OnData(audioData); + } } private: - std::function, const std::string&)> _onFrameCallback; std::mutex m; - std::string _id; + std::shared_ptr _onTrackInterface; + std::vector _streamIds; + libwebrtc::scoped_refptr _track; }; class PmxPeerConnectionObserver : public libwebrtc::RTCPeerConnectionObserver { @@ -53,7 +104,8 @@ class PmxPeerConnectionObserver : public libwebrtc::RTCPeerConnectionObserver { libwebrtc::scoped_refptr peerConnectionFactory, const std::string& streamRoomId, std::shared_ptr keys, - const privmx::webrtc::FrameCryptorOptions& options + const privmx::webrtc::FrameCryptorOptions& options, + std::shared_ptr onTrackInterface = nullptr ); void OnSignalingState(libwebrtc::RTCSignalingState state) override; void OnPeerConnectionState(libwebrtc::RTCPeerConnectionState state) override; @@ -71,46 +123,24 @@ class PmxPeerConnectionObserver : public libwebrtc::RTCPeerConnectionObserver { void UpdateCurrentKeys(std::shared_ptr newKeys); void SetFrameCryptorOptions(privmx::webrtc::FrameCryptorOptions options); - inline void setOnSignalingState(std::function callback) {_onSignalingState = callback;} - inline void setOnPeerConnectionState(std::function callback) {_onPeerConnectionState = callback;} - inline void setOnIceGatheringState(std::function callback) {_onIceGatheringState = callback;} - inline void setOnIceConnectionState(std::function callback) {_onIceConnectionState = callback;} inline void setOnIceCandidate(std::function)> callback) {_onIceCandidate = callback;} - inline void setOnAddStream(std::function)> callback) {_onAddStream = callback;} - inline void setOnRemoveStream(std::function)> callback) {_onRemoveStream = callback;} - inline void setOnDataChannel(std::function)> callback) {_onDataChannel = callback;} - inline void setOnRenegotiationNeeded(std::function callback) {_onRenegotiationNeeded = callback;} - inline void setOnTrack(std::function)> callback) {_onTrack = callback;} - inline void setOnAddTrack(std::function> streams, libwebrtc::scoped_refptr)> callback) {_onAddTrack = callback;} - inline void setOnRemoveTrack(std::function)> callback) {_onRemoveTrack = callback;} - - inline void setOnFrame(std::function, const std::string&)> callback) {_onFrameCallback = callback;} - inline void setOnVideoTrack(std::function callback) {_onVideoTrack = callback;} - inline void setOnRemoveVideoTrack(std::function callback) {_onRemoveVideoTrack = callback;} + void setOnTrackInterface(std::shared_ptr onTrackInterface); + private: libwebrtc::scoped_refptr _peerConnectionFactory; std::string _streamRoomId; std::shared_ptr _currentKeys; - std::optional, const std::string&)>> _onFrameCallback; - std::optional> _onVideoTrack; - std::optional> _onRemoveVideoTrack; - privmx::utils::ThreadSaveMap> _frameCryptors; privmx::webrtc::FrameCryptorOptions _options; - std::optional> _onSignalingState; - std::optional> _onPeerConnectionState; - std::optional> _onIceGatheringState; - std::optional> _onIceConnectionState; - std::optional)>> _onIceCandidate; - std::optional)>> _onAddStream; - std::optional)>> _onRemoveStream; - std::optional)>> _onDataChannel; - std::optional> _onRenegotiationNeeded; - std::optional)>> _onTrack; + std::shared_ptr _onTrackInterface; + privmx::utils::ThreadSaveMap>>> _RTCVideoRenderers; + privmx::utils::ThreadSaveMap> _audioTrackSinks; + privmx::utils::ThreadSaveMap> _frameCryptors; - std::optional>, libwebrtc::scoped_refptr)>> _onAddTrack; - std::optional)>> _onRemoveTrack; + std::optional)>> _onIceCandidate; + // tmp + libwebrtc::scoped_refptr tmpTrack; }; } // stream diff --git a/endpoint/stream/webrtc/include/privmx/endpoint/stream/StreamApiImpl.hpp b/endpoint/stream/webrtc/include/privmx/endpoint/stream/StreamApiImpl.hpp index fb6f5b04..19f3b1e5 100644 --- a/endpoint/stream/webrtc/include/privmx/endpoint/stream/StreamApiImpl.hpp +++ b/endpoint/stream/webrtc/include/privmx/endpoint/stream/StreamApiImpl.hpp @@ -28,12 +28,15 @@ limitations under the License. #include "privmx/endpoint/stream/StreamApiLow.hpp" #include "privmx/endpoint/stream/WebRTCImpl.hpp" #include "privmx/endpoint/stream/PeerConnectionManager.hpp" +#include "privmx/endpoint/stream/webrtc/OnTrackInterface.hpp" #include #include #include #include #include #include +#include +#include #include namespace privmx { @@ -83,8 +86,8 @@ class StreamApiImpl { void enableStreamRoomRecording(const std::string& streamRoomId); StreamHandle createStream(const std::string& streamRoomId); std::vector getMediaDevices(); - void addTrack(const StreamHandle& streamHandle, const MediaDevice& track); - void removeTrack(const StreamHandle& streamHandle, const MediaDevice& track); + MediaTrack addTrack(const StreamHandle& streamHandle, const MediaDevice& mediaDevice); + void removeTrack(const StreamHandle& streamHandle, const MediaDevice& mediaDevice); StreamPublishResult publishStream(const StreamHandle& streamHandle); StreamPublishResult updateStream(const StreamHandle& streamHandle); void unpublishStream(const StreamHandle& streamHandle); @@ -92,6 +95,7 @@ class StreamApiImpl { void modifyRemoteStreamsSubscriptions(const std::string& streamRoomId, const std::vector& subscriptionsToAdd, const std::vector& subscriptionsToRemove, const StreamSettings& options); void unsubscribeFromRemoteStreams(const std::string& streamRoomId, const std::vector& subscriptionsToRemove); void dropBrokenFrames(const std::string& streamRoomId, bool enable); + void setOnTrackInterface(std::shared_ptr onTrackInterface); private: @@ -99,26 +103,113 @@ class StreamApiImpl { Offline = 0, Online = 1 }; + + enum TrackStatus { + ToAdd = 0, + ToRemove = 1, + Published = 2, + }; + + struct StreamAudioTrackInfo { + StreamAudioTrackInfo( + const libwebrtc::scoped_refptr& _device, + const std::string& _deviceName, + const std::string& _deviceId, + const libwebrtc::scoped_refptr& _source, + const libwebrtc::scoped_refptr& _track, + const TrackStatus& _status + ) : + device(_device), + deviceName(_deviceName), + deviceId(_deviceId), + source(_source), + track(_track), + status(_status) + {} + libwebrtc::scoped_refptr device; + std::string deviceName; + std::string deviceId; + libwebrtc::scoped_refptr source; + libwebrtc::scoped_refptr track; + TrackStatus status; + }; + + struct StreamVideoTrackInfo { + StreamVideoTrackInfo( + const libwebrtc::scoped_refptr& _device, + const std::string& _deviceName, + const std::string& _deviceId, + const libwebrtc::scoped_refptr& _capturer, + const libwebrtc::scoped_refptr& _source, + const libwebrtc::scoped_refptr& _track, + const TrackStatus& _status + ) : + device(_device), + deviceName(_deviceName), + deviceId(_deviceId), + capturer(_capturer), + source(_source), + track(_track), + status(_status) + {} + libwebrtc::scoped_refptr device; + std::string deviceName; + std::string deviceId; + libwebrtc::scoped_refptr capturer; + libwebrtc::scoped_refptr source; + libwebrtc::scoped_refptr track; + TrackStatus status; + }; + struct StreamDesktopTrackInfo { + StreamDesktopTrackInfo( + const libwebrtc::scoped_refptr& _device, + const std::string& _deviceName, + const std::string& _deviceId, + const libwebrtc::scoped_refptr& _capturer, + const libwebrtc::scoped_refptr& _source, + const libwebrtc::scoped_refptr& _track, + const TrackStatus& _status + ) : + device(_device), + deviceName(_deviceName), + deviceId(_deviceId), + capturer(_capturer), + source(_source), + track(_track), + status(_status) + {} + libwebrtc::scoped_refptr device; + std::string deviceName; + std::string deviceId; + libwebrtc::scoped_refptr capturer; + libwebrtc::scoped_refptr source; + libwebrtc::scoped_refptr track; + TrackStatus status; + }; + struct StreamData { StreamData( - privmx::utils::ThreadSaveMap> _streamCapturers, + utils::ThreadSaveMap> _audioTracks, + utils::ThreadSaveMap> _videoTracks, StreamStatus _status, std::string _streamRoomId - ) :streamCapturers(_streamCapturers), status(_status), streamRoomId(_streamRoomId) {} - privmx::utils::ThreadSaveMap> streamCapturers; + ) : + audioTracks(_audioTracks), + videoTracks(_videoTracks), + status(_status), + streamRoomId(_streamRoomId) + {} + utils::ThreadSaveMap> audioTracks; + utils::ThreadSaveMap> videoTracks; + utils::ThreadSaveMap> desktopTracks; StreamStatus status; - std::mutex streamMutex; std::string streamRoomId; + std::mutex streamMutex; }; - - int64_t generateNumericId(); - - void trackAddAudio(int64_t streamId, int64_t id = 0, const std::string& params_JSON = "{}"); - void trackAddVideo(int64_t streamId, int64_t id = 0, const std::string& params_JSON = "{}"); - void trackAddDesktop(int64_t streamId, int64_t id = 0, const std::string& params_JSON = "{}"); - void trackRemoveAudio(int64_t streamId, int64_t id = 0); - void trackRemoveVideo(int64_t streamId, int64_t id = 0); - void trackRemoveDesktop(int64_t streamId, int64_t id = 0); + inline std::string getTrimmedString(std::string s) { + s.erase(std::find(s.begin(), s.end(), '\0'), s.end()); + return s; + } // v3 webrtc libwebrtc::scoped_refptr _peerConnectionFactory; diff --git a/endpoint/stream/webrtc/include/privmx/endpoint/stream/WebRTCImpl.hpp b/endpoint/stream/webrtc/include/privmx/endpoint/stream/WebRTCImpl.hpp index 78e5426b..68e103c2 100644 --- a/endpoint/stream/webrtc/include/privmx/endpoint/stream/WebRTCImpl.hpp +++ b/endpoint/stream/webrtc/include/privmx/endpoint/stream/WebRTCImpl.hpp @@ -19,6 +19,7 @@ limitations under the License. #include "privmx/endpoint/stream/WebRTCInterface.hpp" #include "privmx/endpoint/stream/PeerConnectionManager.hpp" +#include "privmx/endpoint/stream/webrtc/OnTrackInterface.hpp" #include #include #include @@ -44,7 +45,6 @@ class WebRTCImpl : public WebRTCInterface privmx::webrtc::FrameCryptorOptions _frameCryptorOptions ); ~WebRTCImpl(); - std::string createOfferAndSetLocalDescription(const std::string& streamRoomId) override; std::string createAnswerAndSetDescriptions(const std::string& streamRoomId, const std::string& sdp, const std::string& type) override; void setAnswerAndSetRemoteDescription(const std::string& streamRoomId, const std::string& sdp, const std::string& type) override; @@ -52,18 +52,32 @@ class WebRTCImpl : public WebRTCInterface void close(const std::string& streamRoomId) override; void updateKeys(const std::string& streamRoomId, const std::vector& keys) override; - void AddAudioTrack(const std::string& streamRoomId, libwebrtc::scoped_refptr audioTrack, std::string id = 0); - void AddVideoTrack(const std::string& streamRoomId, libwebrtc::scoped_refptr videoTrack, std::string id = 0); - void RemoveAudioTrack(const std::string& streamRoomId, std::string id = 0); - void RemoveVideoTrack(const std::string& streamRoomId, std::string id = 0); void setFrameCryptorOptions(const std::string& streamRoomId, const privmx::webrtc::FrameCryptorOptions& frameCryptorOptions); - void setOnFrame(const std::string& streamRoomId, std::function, const std::string&)> OnFrame); - void setOnVideoTrack(const std::string& streamRoomId, std::function OnVideoTrack); - void setOnRemoveVideoTrack(const std::string& streamRoomId, std::function OnRemoveVideoTrack); + void setOnTrackInterface(const std::string& streamRoomId, std::shared_ptr OnTrackInterface); + + void createPeerConnectionWithLocalStream( + const std::string& streamRoomId, + const std::vector>>& audioTracks, + const std::vector>>& videoTracks + ); + void updatePeerConnectionWithLocalStream( + const std::string& streamRoomId, + const std::vector>>& audioTracksToAdd, + const std::vector>>& videoTracksToAdd, + const std::vector>>& audioTracksToRemove, + const std::vector>>& videoTracksToRemove + ); + + void setOnTrackInterface(std::shared_ptr onTrackInterface); private: + void AddAudioTrack(std::shared_ptr jc, libwebrtc::scoped_refptr audioTrack, std::string id = 0); + void AddVideoTrack(std::shared_ptr jc, libwebrtc::scoped_refptr videoTrack, std::string id = 0); + void RemoveAudioTrack(std::shared_ptr jc, std::string id = 0); + void RemoveVideoTrack(std::shared_ptr jc, std::string id = 0); + int64_t addKeyUpdateCallback(std::function)> keyUpdateCallback); void removeKeyUpdateCallback(int64_t keyUpdateCallbackId); @@ -77,6 +91,7 @@ class WebRTCImpl : public WebRTCInterface privmx::webrtc::FrameCryptorOptions _frameCryptorOptions; std::shared_ptr _peerConnectionManager; privmx::utils::ThreadSaveMap> _roomKeys; + std::shared_ptr _onTrackInterface; }; } // namespace stream diff --git a/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/StreamApi.hpp b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/StreamApi.hpp index 103259ce..226e7da3 100644 --- a/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/StreamApi.hpp +++ b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/StreamApi.hpp @@ -21,7 +21,7 @@ limitations under the License. #include #include #include "privmx/endpoint/stream/webrtc/Types.hpp" -#include "webrtc/Types.hpp" +#include "privmx/endpoint/stream/webrtc/OnTrackInterface.hpp" namespace privmx { namespace endpoint { @@ -73,7 +73,7 @@ class StreamApi { void enableStreamRoomRecording(const std::string& streamRoomId); StreamHandle createStream(const std::string& streamRoomId); std::vector getMediaDevices(); - void addTrack(const StreamHandle& streamHandle, const MediaDevice& track); + MediaTrack addTrack(const StreamHandle& streamHandle, const MediaDevice& track); void removeTrack(const StreamHandle& streamHandle, const MediaDevice& track); StreamPublishResult publishStream(const StreamHandle& streamHandle); StreamPublishResult updateStream(const StreamHandle& streamHandle); @@ -83,6 +83,8 @@ class StreamApi { void unsubscribeFromRemoteStreams(const std::string& streamRoomId, const std::vector& subscriptionsToRemove); void dropBrokenFrames(const std::string& streamRoomId, bool enable); + void setOnTrackInterface(std::shared_ptr onTrackInterface); + std::shared_ptr getImpl() const { return _impl; } private: diff --git a/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/OnTrackInterface.hpp b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/OnTrackInterface.hpp new file mode 100644 index 00000000..9213b540 --- /dev/null +++ b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/OnTrackInterface.hpp @@ -0,0 +1,74 @@ +#ifndef _PRIVMXLIB_ENDPOINT_STREAM_ONTRACKINTERFACE_HPP +#define _PRIVMXLIB_ENDPOINT_STREAM_ONTRACKINTERFACE_HPP + +#include +#include +#include "privmx/endpoint/stream/webrtc/Types.hpp" +#include + +namespace privmx { +namespace endpoint { +namespace stream { + +enum TrackAction { + REMOVED, + ADDED +}; +enum DataType { + VIDEO, + AUDIO, + PLAIN +}; +struct Track { + DataType kind; + std::vector streamIds; + std::string trackId; + bool muted; + std::function updateMute; +}; +struct TrackEvent { + std::string id; + std::optional track; + Stream stream; +}; +struct Data { + Data(DataType _type, const std::vector _streamIds, const std::string _track) + : type(_type), streamIds(_streamIds), track(_track) {} + virtual ~Data() = default; + DataType type; + const std::vector streamIds; + const std::string track; +}; +struct VideoData : public Data { + VideoData(DataType _type, const std::vector& _streamIds, const std::string _track, const int64_t _w, const int64_t _h, std::shared_ptr _frameData ) + : Data(_type, _streamIds, _track), w(_w), h(_h), frameData(_frameData) {} + const int64_t w; + const int64_t h; + std::shared_ptr frameData; +}; +struct AudioData : public Data { + AudioData(DataType _type, const std::vector& _streamIds, const std::string _track, const void* _audio_data, int _bits_per_sample, int _sample_rate, size_t _number_of_channels, size_t _number_of_frames) + : Data(_type, _streamIds, _track), audio_data(_audio_data), bits_per_sample(_bits_per_sample), sample_rate(_sample_rate), number_of_channels(_number_of_channels), number_of_frames(_number_of_frames) {} + const void* audio_data; + int bits_per_sample; + int sample_rate; + size_t number_of_channels; + size_t number_of_frames; +}; +struct PlainData : public Data { + +}; + +class OnTrackInterface { +public: + virtual void OnRemoteTrack(Track tack, TrackAction action) = 0; + virtual void OnData(std::shared_ptr data) = 0; +protected: + virtual ~OnTrackInterface() = default; +}; + +} // namespace stream +} // namespace endpoint +} // namespace privmx + +#endif // _PRIVMXLIB_ENDPOINT_STREAM_ONTRACKINTERFACE_HPP diff --git a/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/Types.hpp b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/Types.hpp index 7d0502ff..ad400b93 100644 --- a/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/Types.hpp +++ b/endpoint/stream/webrtc/include_pub/privmx/endpoint/stream/webrtc/Types.hpp @@ -18,6 +18,10 @@ namespace privmx { namespace endpoint { namespace stream { +struct StreamSettings { + Settings settings; + bool dropCorruptedFrames = true; +}; diff --git a/endpoint/stream/webrtc/src/PeerConnectionManager.cpp b/endpoint/stream/webrtc/src/PeerConnectionManager.cpp index f523c1c7..ea59cd64 100644 --- a/endpoint/stream/webrtc/src/PeerConnectionManager.cpp +++ b/endpoint/stream/webrtc/src/PeerConnectionManager.cpp @@ -11,6 +11,7 @@ limitations under the License. #include "privmx/endpoint/stream/PeerConnectionManager.hpp" #include "privmx/endpoint/stream/StreamException.hpp" +#include using namespace privmx::endpoint::stream; @@ -20,9 +21,11 @@ PeerConnectionManager::PeerConnectionManager( ) : _createPeerConnection(createPeerConnection), _onTrickle(onTrickle) {} void PeerConnectionManager::initialize(const std::string& streamRoomId, ConnectionType connectionType, const int64_t sessionId) { + LOG_DEBUG("PeerConnectionManager::initialize") if(_connections.has(streamRoomId)) { auto roomConnections = _connections.get(streamRoomId).value(); if(roomConnections[connectionType]) { + return; throw PeerConnectionAlreadyInitializedException(); } } @@ -66,7 +69,25 @@ bool PeerConnectionManager::hasConnection(const std::string& streamRoomId, Conne std::shared_ptr PeerConnectionManager::getConnectionWithSession(const std::string& streamRoomId, ConnectionType connectionType) { if(!_connections.has(streamRoomId) || !_connections.get(streamRoomId).value()[connectionType]) { + LOG_TRACE("PeerConnectionManager::getConnectionWithSession ", "streamRoom - require initialize") initialize(streamRoomId, connectionType); } + LOG_TRACE("PeerConnectionManager::getConnectionWithSession ", "get form map") return _connections.get(streamRoomId).value()[connectionType]; } + +void PeerConnectionManager::closeConnection(const std::string& streamRoomId, ConnectionType connectionType) { + if(!_connections.has(streamRoomId) || !_connections.get(streamRoomId).value()[connectionType]) { + return; + } + auto jc = _connections.get(streamRoomId).value()[connectionType]; + jc->peerConnection->audioTracks.clear(); + jc->peerConnection->videoTracks.clear(); + jc->peerConnection->pc->Close(); + jc->peerConnection->observer.reset(); + _connections.get(streamRoomId).value().erase(connectionType); +} + +void PeerConnectionManager::closeSession(const std::string& streamRoomId) { + _connections.erase(streamRoomId); +} diff --git a/endpoint/stream/webrtc/src/PmxPeerConnectionObserver.cpp b/endpoint/stream/webrtc/src/PmxPeerConnectionObserver.cpp index 20998f1c..52b390c8 100644 --- a/endpoint/stream/webrtc/src/PmxPeerConnectionObserver.cpp +++ b/endpoint/stream/webrtc/src/PmxPeerConnectionObserver.cpp @@ -11,7 +11,7 @@ limitations under the License. #include "privmx/endpoint/stream/PmxPeerConnectionObserver.hpp" #include #include -#include +#include using namespace privmx::endpoint::stream; @@ -27,8 +27,14 @@ PmxPeerConnectionObserver::PmxPeerConnectionObserver( libwebrtc::scoped_refptr peerConnectionFactory, const std::string& streamRoomId, std::shared_ptr keys, - const privmx::webrtc::FrameCryptorOptions& options -) : _peerConnectionFactory(peerConnectionFactory), _streamRoomId(streamRoomId), _currentKeys(keys), _options(options) {} + const privmx::webrtc::FrameCryptorOptions& options, + std::shared_ptr onTrackInterface +) : +_peerConnectionFactory(peerConnectionFactory), +_streamRoomId(streamRoomId), +_currentKeys(keys), +_options(options), +_onTrackInterface(onTrackInterface) {} void PmxPeerConnectionObserver::OnSignalingState([[maybe_unused]] libwebrtc::RTCSignalingState state) { std::map map = { @@ -39,8 +45,7 @@ void PmxPeerConnectionObserver::OnSignalingState([[maybe_unused]] libwebrtc::RTC {libwebrtc::RTCSignalingState::RTCSignalingStateHaveRemotePrAnswer, "RTCSignalingStateHaveRemotePrAnswer"}, {libwebrtc::RTCSignalingState::RTCSignalingStateClosed, "RTCSignalingStateClosed"} }; - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON SIGNALING STATE " + map[state]) - if(_onSignalingState.has_value()) _onSignalingState.value()(state); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON SIGNALING STATE " + map[state]) } void PmxPeerConnectionObserver::OnPeerConnectionState([[maybe_unused]] libwebrtc::RTCPeerConnectionState state) { std::map map = { @@ -51,8 +56,7 @@ void PmxPeerConnectionObserver::OnPeerConnectionState([[maybe_unused]] libwebrtc {libwebrtc::RTCPeerConnectionState::RTCPeerConnectionStateFailed, "RTCPeerConnectionStateFailed"}, {libwebrtc::RTCPeerConnectionState::RTCPeerConnectionStateClosed, "RTCPeerConnectionStateClosed"} }; - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON PEER CONNECTION STATE " + map[state]) - if(_onPeerConnectionState.has_value()) _onPeerConnectionState.value()(state); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON PEER CONNECTION STATE " + map[state]) } void PmxPeerConnectionObserver::OnIceGatheringState([[maybe_unused]] libwebrtc::RTCIceGatheringState state) { std::map map = { @@ -60,8 +64,7 @@ void PmxPeerConnectionObserver::OnIceGatheringState([[maybe_unused]] libwebrtc:: {libwebrtc::RTCIceGatheringState::RTCIceGatheringStateGathering, "RTCIceGatheringStateGathering"}, {libwebrtc::RTCIceGatheringState::RTCIceGatheringStateComplete, "RTCIceGatheringStateComplete"} }; - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON ICE GATHERING STATE " + map[state]) - if(_onIceGatheringState.has_value()) _onIceGatheringState.value()(state); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON ICE GATHERING STATE " + map[state]) } void PmxPeerConnectionObserver::OnIceConnectionState([[maybe_unused]] libwebrtc::RTCIceConnectionState state) { @@ -75,70 +78,93 @@ void PmxPeerConnectionObserver::OnIceConnectionState([[maybe_unused]] libwebrtc: {libwebrtc::RTCIceConnectionState::RTCIceConnectionStateClosed, "RTCIceConnectionStateClosed"}, {libwebrtc::RTCIceConnectionState::RTCIceConnectionStateMax, "RTCIceConnectionStateMax"} }; - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON ICE CONNECTION STATE " + map[state]) - if(_onIceConnectionState.has_value()) _onIceConnectionState.value()(state); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON ICE CONNECTION STATE " + map[state]) } void PmxPeerConnectionObserver::OnIceCandidate([[maybe_unused]] libwebrtc::scoped_refptr candidate) { if(_onIceCandidate.has_value()) _onIceCandidate.value()(candidate); } void PmxPeerConnectionObserver::OnAddStream(libwebrtc::scoped_refptr stream) { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": STREAM ADDED") - PRIVMX_DEBUG("STREAMS", "API", "stream->video_tracks().size() -> " + std::to_string(stream->video_tracks().size())) - PRIVMX_DEBUG("STREAMS", "API", "stream->audio_tracks().size() -> " + std::to_string(stream->audio_tracks().size())) - PRIVMX_DEBUG("STREAMS", "API", "_onFrameCallback.has_value() -> " + std::to_string(_onFrameCallback.has_value())) - for(size_t i = 0; i < stream->video_tracks().size(); i++) { - auto track = stream->video_tracks()[i]; - if(_onFrameCallback.has_value()) { - RTCVideoRendererImpl>* r { - new RTCVideoRendererImpl>(_onFrameCallback.value(), _streamRoomId + "-" + track->id().std_string()) - }; - PRIVMX_DEBUG("STREAMS", "API", "stream->video_tracks()[i] -> AddRenderer(r)") - track->AddRenderer(r); - } - } - if(_onAddStream.has_value()) _onAddStream.value()(stream); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": STREAM ADDED") + LOG_DEBUG("STREAMS ", "API ", "stream->video_tracks().size() -> " + std::to_string(stream->video_tracks().size())) + LOG_DEBUG("STREAMS ", "API ", "stream->audio_tracks().size() -> " + std::to_string(stream->audio_tracks().size())) } void PmxPeerConnectionObserver::OnRemoveStream([[maybe_unused]] libwebrtc::scoped_refptr stream) { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON REMOVE STREAM") - if(_onRemoveStream.has_value()) _onRemoveStream.value()(stream); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON REMOVE STREAM") } void PmxPeerConnectionObserver::OnDataChannel([[maybe_unused]] libwebrtc::scoped_refptr data_channel) { - if(_onDataChannel.has_value()) _onDataChannel.value()(data_channel); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON DATA CHANNEL") } void PmxPeerConnectionObserver::OnRenegotiationNeeded() { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON RENEGOTIATION NEEDED") - if(_onRenegotiationNeeded.has_value()) _onRenegotiationNeeded.value()(); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON RENEGOTIATION NEEDED") }; void PmxPeerConnectionObserver::OnTrack([[maybe_unused]] libwebrtc::scoped_refptr transceiver) { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON TRACK") - if(_onTrack.has_value()) _onTrack.value()(transceiver); + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON TRACK") + LOG_TRACE("STREAMS ", "API ", _streamRoomId + ": ON TRACK done") } void PmxPeerConnectionObserver::OnAddTrack([[maybe_unused]] libwebrtc::vector> streams, [[maybe_unused]] libwebrtc::scoped_refptr receiver) { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": TRACK ADDED") + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON ADD TRACK") + // set frame crypto to decrypt track _frameCryptors.set( receiver->track()->id().std_string(), privmx::webrtc::FrameCryptorFactory::frameCryptorFromRtpReceiver(_peerConnectionFactory ,receiver, _currentKeys, _options) ); - if(_onAddTrack.has_value()) _onAddTrack.value()(streams, receiver); - if(receiver->media_type() == libwebrtc::RTCMediaType::VIDEO && _onVideoTrack.has_value()) { - _onVideoTrack.value()(_streamRoomId + "-" + receiver->track()->id().std_string()); + + DataType dataType = receiver->track()->kind().std_string() == "video" ? DataType::VIDEO : DataType::AUDIO; + auto track = receiver->track(); + std::vector streamIds; + LOG_TRACE("PmxPeerConnectionObserver::OnAddTrack streamIds.size(): ", receiver->stream_ids().std_vector().size()) + for(const auto& streamId: receiver->stream_ids().std_vector()) { + LOG_TRACE("PmxPeerConnectionObserver::OnAddTrack streamId: ", streamId.std_string()) + streamIds.push_back(streamId.std_string()); } + // callback on track + if(_onTrackInterface) { + _onTrackInterface->OnRemoteTrack(Track{dataType, streamIds, track->id().std_string(), !track->enabled(), [track](bool mute) {return track->set_enabled(!mute);}}, TrackAction::ADDED); + } + // callback on data + if(_onTrackInterface) { + if(dataType == DataType::VIDEO) { + auto videoTrack = static_cast(track.get()); + std::shared_ptr>> renderer = + std::make_shared>>( + _onTrackInterface, streamIds, track + ); + _RTCVideoRenderers.set(videoTrack->id().std_string(), renderer); + videoTrack->AddRenderer(renderer.get()); + } else if(dataType == DataType::AUDIO) { + auto audioTrack = static_cast(track.get()); + std::shared_ptr sink = + std::make_shared( + _onTrackInterface, streamIds, track + ); + _audioTrackSinks.set(audioTrack->id().std_string(), sink); + audioTrack->SetSink(sink); + }; + } + LOG_TRACE("STREAMS ", "API ", _streamRoomId + ": ON ADD TRACK done") } + void PmxPeerConnectionObserver::OnRemoveTrack([[maybe_unused]] libwebrtc::scoped_refptr receiver) { - PRIVMX_DEBUG("STREAMS", "API", _streamRoomId + ": ON REMOVE TRACK") + LOG_DEBUG("STREAMS ", "API ", _streamRoomId + ": ON REMOVE TRACK") _frameCryptors.erase(receiver->track()->id().std_string()); - if(_onRemoveTrack.has_value()) _onRemoveTrack.value()(receiver); - if(receiver->media_type() == libwebrtc::RTCMediaType::VIDEO && _onRemoveVideoTrack.has_value()) { - _onRemoveVideoTrack.value()(_streamRoomId + "-" + receiver->track()->id().std_string()); + DataType dataType = receiver->track()->kind().std_string() == "video" ? DataType::VIDEO : DataType::AUDIO; + if(_onTrackInterface) { + std::vector streamIds; + for(const auto& streamId: receiver->stream_ids().std_vector()) streamIds.push_back(streamId.std_string()); + auto track = receiver->track(); + _onTrackInterface->OnRemoteTrack(Track{dataType, streamIds, receiver->track()->id().std_string(), !receiver->track()->enabled(), [track](bool mute) {return track->set_enabled(mute);}}, TrackAction::ADDED); + if(dataType == DataType::AUDIO) _audioTrackSinks.erase(receiver->track()->id().std_string()); + if(dataType == DataType::VIDEO) _RTCVideoRenderers.erase(receiver->track()->id().std_string()); } } void PmxPeerConnectionObserver::UpdateCurrentKeys(std::shared_ptr newKeys) { _currentKeys = newKeys; - PRIVMX_DEBUG("STREAMS", "PmxPeerConnectionObserver", "PmxPeerConnectionObserver::UpdateCurrentKeys _frameCryptors.size()=" + std::to_string(_frameCryptors.size())); + + LOG_DEBUG("STREAMS", "PmxPeerConnectionObserver", "PmxPeerConnectionObserver::UpdateCurrentKeys _frameCryptors.size()=" + std::to_string(_frameCryptors.size())); _frameCryptors.forAll([&]([[maybe_unused]]const std::string &id, const std::shared_ptr &frameCryptor) { - PRIVMX_DEBUG("STREAMS", "PmxPeerConnectionObserver", "PmxPeerConnectionObserver::UpdateCurrentKeys::forAll::single"); + LOG_DEBUG("STREAMS", "PmxPeerConnectionObserver", "PmxPeerConnectionObserver::UpdateCurrentKeys::forAll::single"); frameCryptor->setKeyStore(_currentKeys); }); } @@ -150,4 +176,13 @@ void PmxPeerConnectionObserver::SetFrameCryptorOptions(privmx::webrtc::FrameCryp }); } +void PmxPeerConnectionObserver::setOnTrackInterface(std::shared_ptr onTrackInterface) { + _onTrackInterface = onTrackInterface; + _RTCVideoRenderers.forAll([onTrackInterface]([[maybe_unused]] const std::string& trackId, const std::shared_ptr>>& r) { + r->updateOnTrackInterface(onTrackInterface); + }); + _audioTrackSinks.forAll([onTrackInterface]([[maybe_unused]] const std::string& trackId, const std::shared_ptr& s) { + s->updateOnTrackInterface(onTrackInterface); + }); +} diff --git a/endpoint/stream/webrtc/src/StreamApi.cpp b/endpoint/stream/webrtc/src/StreamApi.cpp index f83ff5e8..c2b7ef08 100644 --- a/endpoint/stream/webrtc/src/StreamApi.cpp +++ b/endpoint/stream/webrtc/src/StreamApi.cpp @@ -181,7 +181,7 @@ std::vector StreamApi::getMediaDevices() { } } -void StreamApi::addTrack(const StreamHandle& streamHandle, const MediaDevice& track) { +MediaTrack StreamApi::addTrack(const StreamHandle& streamHandle, const MediaDevice& track) { validateEndpoint(); try { return _impl->addTrack(streamHandle, track); @@ -304,6 +304,16 @@ void StreamApi::dropBrokenFrames(const std::string& streamRoomId, bool enable) { } } +void StreamApi::setOnTrackInterface(std::shared_ptr onTrackInterface) { + validateEndpoint(); + try { + return _impl->setOnTrackInterface(onTrackInterface); + } catch (const privmx::utils::PrivmxException& e) { + core::ExceptionConverter::rethrowAsCoreException(e); + throw core::Exception("ExceptionConverter rethrow error"); + } +} + void StreamApi::validateEndpoint() { if(!_impl) throw NotInitializedException(); } \ No newline at end of file diff --git a/endpoint/stream/webrtc/src/StreamApiImpl.cpp b/endpoint/stream/webrtc/src/StreamApiImpl.cpp index 925dbc81..30a7a51f 100644 --- a/endpoint/stream/webrtc/src/StreamApiImpl.cpp +++ b/endpoint/stream/webrtc/src/StreamApiImpl.cpp @@ -27,8 +27,6 @@ limitations under the License. #include #include #include -#include -#include #include using namespace privmx::endpoint; @@ -70,6 +68,7 @@ std::vector StreamApiImpl::listStreams(const std::string& streamRoom void StreamApiImpl::joinStreamRoom(const std::string& streamRoomId) { _api->joinStreamRoom(streamRoomId, _webRTC); } + void StreamApiImpl::leaveStreamRoom(const std::string& streamRoomId) { _api->leaveStreamRoom(streamRoomId); } @@ -83,13 +82,15 @@ StreamHandle StreamApiImpl::createStream(const std::string& streamRoomId) { _streamDataMap.set( streamHandle, std::make_shared( - privmx::utils::ThreadSaveMap>(), - StreamStatus::Online, + privmx::utils::ThreadSaveMap>(), + privmx::utils::ThreadSaveMap>(), + StreamStatus::Offline, streamRoomId ) ); return streamHandle; } + std::vector StreamApiImpl::getMediaDevices() { std::vector result; std::string name, deviceId; @@ -100,39 +101,41 @@ std::vector StreamApiImpl::getMediaDevices() { uint32_t audio_num = audioDevice->RecordingDevices(); for (uint32_t i = 0; i < audio_num; ++i) { audioDevice->RecordingDeviceName(i, (char*)name.data(), (char*)deviceId.data()); - result.push_back(MediaDevice{name, deviceId, DeviceType::Audio}); + result.push_back(MediaDevice{getTrimmedString(name), getTrimmedString(deviceId), DeviceType::Audio}); } // Video libwebrtc::scoped_refptr videoDevice = _peerConnectionFactory->GetVideoDevice(); uint32_t video_num = videoDevice->NumberOfDevices(); for (uint32_t i = 0; i < video_num; ++i) { videoDevice->GetDeviceName(i, (char*)name.data(), name.size(), (char*)deviceId.data(), deviceId.size()); - result.push_back(MediaDevice{name, deviceId, DeviceType::Video}); + result.push_back(MediaDevice{getTrimmedString(name), getTrimmedString(deviceId), DeviceType::Video}); } // Desktop libwebrtc::scoped_refptr desktopDevice = _peerConnectionFactory->GetDesktopDevice(); result.push_back(MediaDevice{"desktop", "desktop", DeviceType::Desktop}); return result; } -void StreamApiImpl::addTrack(const StreamHandle& streamHandle, const MediaDevice& track) { + +MediaTrack StreamApiImpl::addTrack(const StreamHandle& streamHandle, const MediaDevice& mediaDevice) { + + auto streamDataOpt = _streamDataMap.get(streamHandle); + if(!streamDataOpt.has_value()) { + throw IncorrectStreamHandleException(); + } + auto streamData = streamDataOpt.value(); std::string name, deviceId; name.resize(255); deviceId.resize(255); - switch (track.type) { + switch (mediaDevice.type) { case DeviceType::Audio: - { - auto streamDataOpt = _streamDataMap.get(streamHandle); - if(!streamDataOpt.has_value()) { - throw IncorrectStreamHandleException(); - } - auto streamData = streamDataOpt.value(); + {; libwebrtc::scoped_refptr audioDevice = _peerConnectionFactory->GetAudioDevice(); uint32_t num = audioDevice->RecordingDevices(); std::optional id; for (uint32_t i = 0; i < num; ++i) { audioDevice->RecordingDeviceName(i, (char*)name.data(), (char*)deviceId.data()); - if(name == track.name && deviceId == track.id) { + if(getTrimmedString(name) == mediaDevice.name && getTrimmedString(deviceId) == mediaDevice.id) { id = i; break; } @@ -143,24 +146,24 @@ void StreamApiImpl::addTrack(const StreamHandle& streamHandle, const MediaDevice audioDevice->SetRecordingDevice(id.value()); auto audioSource = _peerConnectionFactory->CreateAudioSource("audio_source"); auto audioTrack = _peerConnectionFactory->CreateAudioTrack(audioSource, "audio_track"); - audioTrack->SetVolume(10); - // Add tracks to the peer connection - _webRTC->AddAudioTrack(streamData->streamRoomId, audioTrack, std::to_string(id.value())); + std::lock_guard lock(streamData->streamMutex); + streamData->audioTracks.set( + mediaDevice.name + "-" + mediaDevice.id, + std::make_shared(audioDevice, mediaDevice.name, mediaDevice.id, audioSource, audioTrack, TrackStatus::ToAdd) + ); + return MediaTrack{[audioTrack](bool enabled) { + audioTrack->set_enabled(enabled); + }}; } break; case DeviceType::Video: { - auto streamDataOpt = _streamDataMap.get(streamHandle); - if(!streamDataOpt.has_value()) { - throw IncorrectStreamHandleException(); - } - auto streamData = streamDataOpt.value(); libwebrtc::scoped_refptr videoDevice = _peerConnectionFactory->GetVideoDevice(); uint32_t num = videoDevice->NumberOfDevices(); std::optional id; for (uint32_t i = 0; i < num; ++i) { videoDevice->GetDeviceName(i, (char*)name.data(), name.size(), (char*)deviceId.data(), deviceId.size()); - if(name == track.name && deviceId == track.id) { + if(getTrimmedString(name) == mediaDevice.name && getTrimmedString(deviceId) == mediaDevice.id) { id = i; break; } @@ -171,59 +174,226 @@ void StreamApiImpl::addTrack(const StreamHandle& streamHandle, const MediaDevice libwebrtc::scoped_refptr videoCapturer = videoDevice->Create("video_capturer", id.value(), 1280, 720, 30); libwebrtc::scoped_refptr videoSource = _peerConnectionFactory->CreateVideoSource(videoCapturer, "video_source", _constraints); libwebrtc::scoped_refptr videoTrack = _peerConnectionFactory->CreateVideoTrack(videoSource, "video_track"); - // Add tracks to the peer connection - _webRTC->AddVideoTrack(streamData->streamRoomId, videoTrack, std::to_string(id.value())); std::lock_guard lock(streamData->streamMutex); - streamData->streamCapturers.set(id.value(), videoCapturer); - if(streamData->status == StreamStatus::Online) { - videoCapturer->StartCapture(); - } + streamData->videoTracks.set( + mediaDevice.name + "-" + mediaDevice.id, + std::make_shared(videoDevice, mediaDevice.name, mediaDevice.id, videoCapturer, videoSource, videoTrack, TrackStatus::ToAdd) + ); + return MediaTrack{[videoTrack](bool enabled) { + videoTrack->set_enabled(enabled); + }}; } break; case DeviceType::Desktop: { - throw NotImplementedException(); + auto streamDataOpt = _streamDataMap.get(streamHandle); + if(!streamDataOpt.has_value()) { + throw IncorrectStreamHandleException(); + } + auto streamData = streamDataOpt.value(); + libwebrtc::scoped_refptr desktopDevice = _peerConnectionFactory->GetDesktopDevice(); + auto desktopMediaList = desktopDevice->GetDesktopMediaList(libwebrtc::DesktopType::kScreen); + desktopMediaList->UpdateSourceList(true, true); + libwebrtc::scoped_refptr desktopCapturer = desktopDevice->CreateDesktopCapturer(desktopMediaList->GetSource(0)); + libwebrtc::scoped_refptr videoSource = _peerConnectionFactory->CreateDesktopSource(desktopCapturer, "desktop_source", _constraints); + libwebrtc::scoped_refptr videoTrack = _peerConnectionFactory->CreateVideoTrack(videoSource, "desktop_track"); + std::lock_guard lock(streamData->streamMutex); + streamData->desktopTracks.set( + mediaDevice.name + "-" + mediaDevice.id, + std::make_shared(desktopDevice, mediaDevice.name, mediaDevice.id, desktopCapturer, videoSource, videoTrack, TrackStatus::ToAdd) + ); + return MediaTrack{[videoTrack](bool enabled) { + videoTrack->set_enabled(enabled); + }}; } break; default: throw NotImplementedException(); } } -void StreamApiImpl::removeTrack(const StreamHandle& streamHandle, const MediaDevice& track) { +void StreamApiImpl::removeTrack(const StreamHandle& streamHandle, const MediaDevice& mediaDevice) { + + auto streamDataOpt = _streamDataMap.get(streamHandle); + if(!streamDataOpt.has_value()) { + throw IncorrectStreamHandleException(); + } + auto streamData = streamDataOpt.value(); + + switch (mediaDevice.type) { + case DeviceType::Audio: + { + LOG_INFO("StreamApiImpl::removeTrack Audio - ", mediaDevice.name + "-" + mediaDevice.id) + std::lock_guard lock(streamData->streamMutex); + auto trackOpt = streamData->audioTracks.get(mediaDevice.name + "-" + mediaDevice.id); + if(!trackOpt.has_value()) { + throw IncorrectTrackIdException(); + } + auto track = trackOpt.value(); + if(track->status == TrackStatus::ToAdd) { + streamData->audioTracks.erase(mediaDevice.name + "-" + mediaDevice.id); + } else if(track->status == TrackStatus::Published) { + track->status = TrackStatus::ToRemove; + streamData->audioTracks.set(mediaDevice.name + "-" + mediaDevice.id, track); + } + } + break; + case DeviceType::Video: + { + + LOG_INFO("StreamApiImpl::removeTrack Video - ", mediaDevice.name + "-" + mediaDevice.id) + std::lock_guard lock(streamData->streamMutex); + auto trackOpt = streamData->videoTracks.get(mediaDevice.name + "-" + mediaDevice.id); + if(!trackOpt.has_value()) { + throw IncorrectTrackIdException(); + } + auto track = trackOpt.value(); + if(track->status == TrackStatus::ToAdd) { + streamData->videoTracks.erase(mediaDevice.name + "-" + mediaDevice.id); + } else if(track->status == TrackStatus::Published) { + track->status = TrackStatus::ToRemove; + streamData->videoTracks.set(mediaDevice.name + "-" + mediaDevice.id, track); + } + } + break; + case DeviceType::Desktop: + { + LOG_INFO("StreamApiImpl::removeTrack Desktop - ", mediaDevice.name + "-" + mediaDevice.id) + std::lock_guard lock(streamData->streamMutex); + auto trackOpt = streamData->desktopTracks.get(mediaDevice.name + "-" + mediaDevice.id); + if(!trackOpt.has_value()) { + throw IncorrectTrackIdException(); + } + auto track = trackOpt.value(); + if(track->status == TrackStatus::ToAdd) { + streamData->desktopTracks.erase(mediaDevice.name + "-" + mediaDevice.id); + } else if(track->status == TrackStatus::Published) { + track->status = TrackStatus::ToRemove; + streamData->desktopTracks.set(mediaDevice.name + "-" + mediaDevice.id, track); + } + } + break; + default: + throw NotImplementedException(); + } } + StreamPublishResult StreamApiImpl::publishStream(const StreamHandle& streamHandle) { + auto streamDataOpt = _streamDataMap.get(streamHandle); + if(!streamDataOpt.has_value()) { + throw IncorrectStreamHandleException(); + } + auto streamData = streamDataOpt.value(); + // Add tracks to the peer connection + std::vector>> audioTracksToAdd; + std::vector>> videoTracksToAdd; + streamData->audioTracks.forAll([&](const std::string& id,const std::shared_ptr& audio) { + if(audio->status == TrackStatus::ToAdd) { + audio->status = TrackStatus::Published; + audioTracksToAdd.push_back({id, audio->track}); + } + }); + streamData->videoTracks.forAll([&](const std::string& id,const std::shared_ptr& video) { + if(video->status == TrackStatus::ToAdd) { + video->status = TrackStatus::Published; + if(!video->capturer->CaptureStarted()) { + video->capturer->StartCapture(); + } + videoTracksToAdd.push_back({id, video->track}); + } + }); + streamData->desktopTracks.forAll([&](const std::string& id,const std::shared_ptr& desktop) { + if(desktop->status == TrackStatus::ToAdd) { + desktop->status = TrackStatus::Published; + if(!desktop->capturer->IsRunning()) { + desktop->capturer->Start(15); + } + videoTracksToAdd.push_back({id, desktop->track}); + } + }); + streamData->status = Online; + _webRTC->createPeerConnectionWithLocalStream(streamData->streamRoomId, audioTracksToAdd, videoTracksToAdd); return _api->publishStream(streamHandle); } StreamPublishResult StreamApiImpl::updateStream(const StreamHandle& streamHandle) { + auto streamDataOpt = _streamDataMap.get(streamHandle); + if(!streamDataOpt.has_value()) { + throw IncorrectStreamHandleException(); + } + auto streamData = streamDataOpt.value(); + + // Add tracks to the peer connection + // UPDATE audio tracks + std::vector>> audioTracksToAdd; + std::vector>> audioTracksToRemove; + streamData->audioTracks.forAll([&](const std::string& id,const std::shared_ptr& audio) { + if(audio->status == TrackStatus::ToAdd) { + audio->status = TrackStatus::Published; + audioTracksToAdd.push_back({id, audio->track}); + } else if(audio->status == TrackStatus::ToRemove) { + audioTracksToRemove.push_back({id, audio->track}); + } + }); + for(const auto& toRemove : audioTracksToRemove) { + streamData->audioTracks.erase(toRemove.first); + } + // UPDATE video tracks + std::vector>> videoTracksToAdd; + std::vector>> videoTracksToRemove; + streamData->videoTracks.forAll([&](const std::string& id,const std::shared_ptr& video) { + if(video->status == TrackStatus::ToAdd) { + if(!video->capturer->CaptureStarted()) video->capturer->StartCapture(); + video->status = TrackStatus::Published; + videoTracksToAdd.push_back({id, video->track}); + } else if(video->status == TrackStatus::ToRemove) { + if(video->capturer->CaptureStarted()) video->capturer->StopCapture(); + videoTracksToRemove.push_back({id, video->track}); + } + }); + size_t toRemove = 0; + for(; toRemove < videoTracksToRemove.size(); toRemove++ ) { + streamData->videoTracks.erase(videoTracksToRemove[toRemove].first); + } + streamData->desktopTracks.forAll([&](const std::string& id,const std::shared_ptr& desktop) { + if(desktop->status == TrackStatus::ToAdd) { + if(!desktop->capturer->IsRunning()) desktop->capturer->Start(15); + desktop->status = TrackStatus::Published; + videoTracksToAdd.push_back({id, desktop->track}); + } else if(desktop->status == TrackStatus::ToRemove) { + if(desktop->capturer->IsRunning()) desktop->capturer->Stop(); + videoTracksToRemove.push_back({id, desktop->track}); + } + }); + for(; toRemove < videoTracksToRemove.size(); toRemove++ ) { + streamData->desktopTracks.erase(videoTracksToRemove[toRemove].first); + } + _webRTC->updatePeerConnectionWithLocalStream(streamData->streamRoomId, audioTracksToAdd, videoTracksToAdd, audioTracksToRemove, videoTracksToRemove); return _api->updateStream(streamHandle); } void StreamApiImpl::unpublishStream(const StreamHandle& streamHandle) { _api->unpublishStream(streamHandle); } + void StreamApiImpl::subscribeToRemoteStreams(const std::string& streamRoomId, const std::vector& subscriptions, const StreamSettings& options) { int64_t streamId = generateNumericId(); _streamDataMap.set( streamId, std::make_shared( - privmx::utils::ThreadSaveMap>(), - StreamStatus::Online, + privmx::utils::ThreadSaveMap>(), + privmx::utils::ThreadSaveMap>(), + StreamStatus::Offline, streamRoomId ) ); - if(options.OnFrame.has_value()) { - _webRTC->setOnFrame(streamRoomId, options.OnFrame.value()); - } - if(options.OnVideoRemove.has_value()) { - _webRTC->setOnRemoveVideoTrack(streamRoomId, options.OnVideoRemove.value()); - } _api->subscribeToRemoteStreams(streamRoomId, subscriptions, options.settings); } + void StreamApiImpl::modifyRemoteStreamsSubscriptions(const std::string& streamRoomId, const std::vector& subscriptionsToAdd, const std::vector& subscriptionsToRemove, const StreamSettings& options) { _api->modifyRemoteStreamsSubscriptions(streamRoomId, subscriptionsToAdd, subscriptionsToRemove, options.settings); } + void StreamApiImpl::unsubscribeFromRemoteStreams(const std::string& streamRoomId, const std::vector& subscriptionsToRemove) { _api->unsubscribeFromRemoteStreams(streamRoomId, subscriptionsToRemove); } @@ -284,4 +454,8 @@ std::string StreamApiImpl::buildSubscriptionQuery(EventType eventType, EventSele void StreamApiImpl::dropBrokenFrames(const std::string& streamRoomId, bool enable) { _frameCryptorOptions = privmx::webrtc::FrameCryptorOptions{.dropFrameIfCryptionFailed=enable}; _webRTC->setFrameCryptorOptions(streamRoomId, _frameCryptorOptions); +} + +void StreamApiImpl::setOnTrackInterface(std::shared_ptr onTrackInterface) { + _webRTC->setOnTrackInterface(onTrackInterface); } \ No newline at end of file diff --git a/endpoint/stream/webrtc/src/WebRTCImpl.cpp b/endpoint/stream/webrtc/src/WebRTCImpl.cpp index 57e05b44..47000414 100644 --- a/endpoint/stream/webrtc/src/WebRTCImpl.cpp +++ b/endpoint/stream/webrtc/src/WebRTCImpl.cpp @@ -1,8 +1,8 @@ #include "privmx/endpoint/stream/WebRTCImpl.hpp" #include "privmx/endpoint/stream/StreamException.hpp" #include "privmx/endpoint/stream/PeerConnectionManager.hpp" -#include #include +#include using namespace privmx::endpoint::stream; @@ -64,7 +64,7 @@ std::string WebRTCImpl::createAnswerAndSetDescriptions(const std::string& stream throw stream::WebRTCException("OnSetSdpFailure " + std::string(error)); } ); - if (!tmp.get_future().get()) { + if (tmp.get_future().wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { throw stream::WebRTCException("SetRemoteDescriptionFailed"); } // Create answer @@ -111,36 +111,29 @@ void WebRTCImpl::setAnswerAndSetRemoteDescription(const std::string& streamRoomI } void WebRTCImpl::close(const std::string& streamRoomId) { - PRIVMX_DEBUG("STREAMS", "WebRTC_IMPL", "WebRTCImpl::close()"); - auto peerConnection_1 = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher)->peerConnection; - peerConnection_1->audioTracks.clear(); - peerConnection_1->videoTracks.clear(); - peerConnection_1->pc->Close(); - auto peerConnection_2 = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber)->peerConnection; - peerConnection_2->audioTracks.clear(); - peerConnection_2->videoTracks.clear(); - peerConnection_2->pc->Close(); + LOG_DEBUG("STREAMS", "WebRTC_IMPL", "WebRTCImpl::close()"); + _peerConnectionManager->closeConnection(streamRoomId, ConnectionType::Publisher); + _peerConnectionManager->closeConnection(streamRoomId, ConnectionType::Subscriber); + _peerConnectionManager->closeSession(streamRoomId); } void WebRTCImpl::updateKeys(const std::string& streamRoomId, const std::vector& keys) { - { auto peerConnection_p = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher)->peerConnection; auto peerConnection_s = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber)->peerConnection; std::string keysIds; for(auto k: keys) keysIds += (k.keyId + ", "); - std::cout << "RecivedKyesIds: " << keysIds << std::endl; - PRIVMX_DEBUG("STREAMS", "WebRTC", "updateKeys createWebRtcKeyStore"); + LOG_DEBUG("STREAMS:WebRTC ", "updateKeys createWebRtcKeyStore"); { std::unique_lock lock1(peerConnection_p->trackMutex); std::unique_lock lock2(peerConnection_s->trackMutex); peerConnection_s->keys = createWebRtcKeyStore(keys); peerConnection_p->keys = createWebRtcKeyStore(keys); - PRIVMX_DEBUG("STREAMS", "WebRTC", "updateKeys _peerConnectionObserver->UpdateCurrentKeys"); + LOG_DEBUG("STREAMS:WebRTC ", "updateKeys _peerConnectionObserver->UpdateCurrentKeys"); // update input data keys - peerConnection_p->observer->UpdateCurrentKeys(peerConnection_p->keys); - peerConnection_s->observer->UpdateCurrentKeys(peerConnection_s->keys); + if(peerConnection_p->observer) peerConnection_p->observer->UpdateCurrentKeys(peerConnection_p->keys); + if(peerConnection_s->observer) peerConnection_s->observer->UpdateCurrentKeys(peerConnection_s->keys); } - PRIVMX_DEBUG("STREAMS", "WebRTC", "updateKeys for_each->_audioTracks"); + LOG_DEBUG("STREAMS:WebRTC ", "updateKeys for_each->_audioTracks"); { std::unique_lock lock1(peerConnection_p->trackMutex); std::unique_lock lock2(peerConnection_s->trackMutex); @@ -155,7 +148,7 @@ void WebRTCImpl::updateKeys(const std::string& streamRoomId, const std::vectoraudioTracks.end(), [&](const auto& p) {p.second.frameCryptor->setKeyStore(peerConnection_s->keys);} ); - PRIVMX_DEBUG("STREAMS", "WebRTC", "updateKeys for_each->_videoTracks"); + LOG_DEBUG("STREAMS:WebRTC ", "updateKeys for_each->_videoTracks"); std::for_each( peerConnection_p->videoTracks.begin(), peerConnection_p->videoTracks.end(), @@ -167,25 +160,109 @@ void WebRTCImpl::updateKeys(const std::string& streamRoomId, const std::vectorsetKeyStore(peerConnection_s->keys);} ); } +} + +std::shared_ptr WebRTCImpl::createWebRtcKeyStore(const std::vector& keys) { + std::vector webRtcKeys; + for(size_t i = 0; i < keys.size(); i++) { + webRtcKeys.push_back( + privmx::webrtc::Key{. + keyId=keys[i].keyId, + .key=keys[i].key.stdString(), + .type=keys[i].type == privmx::endpoint::stream::KeyType::REMOTE ? privmx::webrtc::KeyType::REMOTE : privmx::webrtc::KeyType::LOCAL + } + ); + } + return privmx::webrtc::KeyStore::Create(webRtcKeys); +} + +std::shared_ptr WebRTCImpl::createPeerConnection(const std::string& streamRoomId) { + auto peerConnection = std::make_shared(); + peerConnection->pc = _peerConnectionFactory->Create(_configuration, _constraints); + std::string streamId = streamRoomId + "-" + privmx::utils::Utils::getNowTimestampStr(); // TMP + peerConnection->mediaStream = _peerConnectionFactory->CreateStream(streamId); + peerConnection->pc->CreateLocalMediaStream(streamId); + peerConnection->observer = std::make_shared(_peerConnectionFactory, streamRoomId, privmx::webrtc::KeyStore::Create(std::vector()), _frameCryptorOptions); + peerConnection->pc->RegisterRTCPeerConnectionObserver(peerConnection->observer.get()); + peerConnection->observer->setOnTrackInterface(_onTrackInterface); + return peerConnection; +} + +void WebRTCImpl::setFrameCryptorOptions(const std::string& streamRoomId, const privmx::webrtc::FrameCryptorOptions& frameCryptorOptions) { + auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); + connection->peerConnection->observer->SetFrameCryptorOptions(frameCryptorOptions); +} + +void WebRTCImpl::setOnTrackInterface(const std::string& streamRoomId, std::shared_ptr OnTrackInterface) { + auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); + connection->peerConnection->observer->setOnTrackInterface(OnTrackInterface); +} + +void WebRTCImpl::createPeerConnectionWithLocalStream( + const std::string& streamRoomId, + const std::vector>>& audioTracks, + const std::vector>>& videoTracks +) { + _peerConnectionManager->initialize(streamRoomId, ConnectionType::Publisher); + auto jc = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); + + + for(auto audioTrack: audioTracks) { + AddAudioTrack(jc, audioTrack.second, privmx::utils::Hex::from(audioTrack.first)); + } + for(auto videoTrack: videoTracks) { + AddVideoTrack(jc, videoTrack.second, privmx::utils::Hex::from(videoTrack.first)); + } +} + +void WebRTCImpl::updatePeerConnectionWithLocalStream( + const std::string& streamRoomId, + const std::vector>>& audioTracksToAdd, + const std::vector>>& videoTracksToAdd, + const std::vector>>& audioTracksToRemove, + const std::vector>>& videoTracksToRemove + +) { + _peerConnectionManager->initialize(streamRoomId, ConnectionType::Publisher); + auto jc = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); + LOG_DEBUG("updatePeerConnectionWithLocalStream") + for(auto audioTrack: audioTracksToRemove) { + LOG_DEBUG("updatePeerConnectionWithLocalStream:audioTracksToRemove - ", audioTrack.first) + RemoveAudioTrack(jc, privmx::utils::Hex::from(audioTrack.first)); + } + for(auto videoTrack: videoTracksToRemove) { + LOG_DEBUG("updatePeerConnectionWithLocalStream:videoTracksToRemove - ", videoTrack.first) + RemoveVideoTrack(jc, privmx::utils::Hex::from(videoTrack.first)); + } + for(auto audioTrack: audioTracksToAdd) { + LOG_DEBUG("updatePeerConnectionWithLocalStream:audioTracksToAdd - ", audioTrack.first) + AddAudioTrack(jc, audioTrack.second, privmx::utils::Hex::from(audioTrack.first)); + } + for(auto videoTrack: videoTracksToAdd) { + LOG_DEBUG("updatePeerConnectionWithLocalStream:videoTracksToAdd - ", videoTrack.first) + AddVideoTrack(jc, videoTrack.second, privmx::utils::Hex::from(videoTrack.first)); } + LOG_DEBUG("updatePeerConnectionWithLocalStream:pc->audioTracks -" , jc->peerConnection->audioTracks.size()); + LOG_DEBUG("updatePeerConnectionWithLocalStream:pc->videoTracks -" , jc->peerConnection->videoTracks.size()); } -void WebRTCImpl::AddAudioTrack(const std::string& streamRoomId, libwebrtc::scoped_refptr audioTrack, std::string id) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); - auto sender = connection->peerConnection->pc->AddTrack(audioTrack, libwebrtc::vector{std::vector{id}}); + +void WebRTCImpl::AddAudioTrack(std::shared_ptr jc, libwebrtc::scoped_refptr audioTrack, std::string id) { + jc->peerConnection->mediaStream->AddTrack(audioTrack); + auto sender = jc->peerConnection->pc->AddTrack(audioTrack, libwebrtc::vector{std::vector{jc->peerConnection->mediaStream->id()}}); std::shared_ptr frameCryptor; { - std::shared_lock lock(connection->peerConnection->trackMutex); + std::shared_lock lock(jc->peerConnection->trackMutex); frameCryptor = privmx::webrtc::FrameCryptorFactory::frameCryptorFromRtpSender( _peerConnectionFactory, sender, - connection->peerConnection->keys, + jc->peerConnection->keys, _frameCryptorOptions ); } { - std::unique_lock lock(connection->peerConnection->trackMutex); - connection->peerConnection->audioTracks.insert(std::make_pair( + std::unique_lock lock(jc->peerConnection->trackMutex); + jc->peerConnection->audioTracks.insert(std::make_pair( id, AudioTrackInfo{ .track=audioTrack, @@ -196,22 +273,22 @@ void WebRTCImpl::AddAudioTrack(const std::string& streamRoomId, libwebrtc::scope } } -void WebRTCImpl::AddVideoTrack(const std::string& streamRoomId, libwebrtc::scoped_refptr videoTrack, std::string id) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); - auto sender = connection->peerConnection->pc->AddTrack(videoTrack, libwebrtc::vector{std::vector{id}}); +void WebRTCImpl::AddVideoTrack(std::shared_ptr jc, libwebrtc::scoped_refptr videoTrack, std::string id) { + jc->peerConnection->mediaStream->AddTrack(videoTrack); + auto sender = jc->peerConnection->pc->AddTrack(videoTrack, libwebrtc::vector{std::vector{jc->peerConnection->mediaStream->id()}}); std::shared_ptr frameCryptor; { - std::shared_lock lock(connection->peerConnection->trackMutex); + std::shared_lock lock(jc->peerConnection->trackMutex); frameCryptor = privmx::webrtc::FrameCryptorFactory::frameCryptorFromRtpSender( _peerConnectionFactory, sender, - connection->peerConnection->keys, + jc->peerConnection->keys, _frameCryptorOptions ); } { - std::unique_lock lock(connection->peerConnection->trackMutex); - connection->peerConnection->videoTracks.insert(std::make_pair( + std::unique_lock lock(jc->peerConnection->trackMutex); + jc->peerConnection->videoTracks.insert(std::make_pair( id, VideoTrackInfo{ .track=videoTrack, @@ -223,71 +300,31 @@ void WebRTCImpl::AddVideoTrack(const std::string& streamRoomId, libwebrtc::scope } -void WebRTCImpl::RemoveAudioTrack(const std::string& streamRoomId, std::string id) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); - std::unique_lock lock(connection->peerConnection->trackMutex); - auto it = connection->peerConnection->audioTracks.find(id); - if(it != connection->peerConnection->audioTracks.end()) { - // _mediaStream->RemoveTrack(it->second.track); - connection->peerConnection->pc->RemoveTrack(it->second.sender); - connection->peerConnection->audioTracks.erase(it); +void WebRTCImpl::RemoveAudioTrack(std::shared_ptr jc, std::string id) { + std::unique_lock lock(jc->peerConnection->trackMutex); + auto it = jc->peerConnection->audioTracks.find(id); + if(it != jc->peerConnection->audioTracks.end()) { + jc->peerConnection->mediaStream->RemoveTrack(it->second.track); + jc->peerConnection->pc->RemoveTrack(it->second.sender); + jc->peerConnection->audioTracks.erase(it); } else { throw IncorrectTrackIdException(); } } -void WebRTCImpl::RemoveVideoTrack(const std::string& streamRoomId, std::string id) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Publisher); - std::unique_lock lock(connection->peerConnection->trackMutex); - auto it = connection->peerConnection->audioTracks.find(id); - if(it != connection->peerConnection->audioTracks.end()) { - // _mediaStream->RemoveTrack(it->second.track); - connection->peerConnection->pc->RemoveTrack(it->second.sender); - connection->peerConnection->audioTracks.erase(it); +void WebRTCImpl::RemoveVideoTrack(std::shared_ptr jc, std::string id) { + std::unique_lock lock(jc->peerConnection->trackMutex); + auto it = jc->peerConnection->videoTracks.find(id); + if(it != jc->peerConnection->videoTracks.end()) { + jc->peerConnection->mediaStream->RemoveTrack(it->second.track); + jc->peerConnection->pc->RemoveTrack(it->second.sender); + jc->peerConnection->videoTracks.erase(it); } else { throw IncorrectTrackIdException(); } } -std::shared_ptr WebRTCImpl::createWebRtcKeyStore(const std::vector& keys) { - std::vector webRtcKeys; - for(size_t i = 0; i < keys.size(); i++) { - webRtcKeys.push_back( - privmx::webrtc::Key{. - keyId=keys[i].keyId, - .key=keys[i].key.stdString(), - .type=keys[i].type == privmx::endpoint::stream::KeyType::REMOTE ? privmx::webrtc::KeyType::REMOTE : privmx::webrtc::KeyType::LOCAL - } - ); - } - return privmx::webrtc::KeyStore::Create(webRtcKeys); +void WebRTCImpl::setOnTrackInterface(std::shared_ptr onTrackInterface) { + _onTrackInterface = onTrackInterface; } -std::shared_ptr WebRTCImpl::createPeerConnection(const std::string& streamRoomId) { - auto peerConnection = std::make_shared(); - auto tmp = _peerConnectionFactory->Create(_configuration, _constraints); - peerConnection->pc = tmp; - peerConnection->observer = std::make_shared(_peerConnectionFactory, streamRoomId, privmx::webrtc::KeyStore::Create(std::vector()), _frameCryptorOptions); - peerConnection->pc->RegisterRTCPeerConnectionObserver(peerConnection->observer.get()); - return peerConnection; -} - -void WebRTCImpl::setFrameCryptorOptions(const std::string& streamRoomId, const privmx::webrtc::FrameCryptorOptions& frameCryptorOptions) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); - connection->peerConnection->observer->SetFrameCryptorOptions(frameCryptorOptions); -} - -void WebRTCImpl::setOnFrame(const std::string& streamRoomId, std::function, const std::string&)> OnFrame) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); - connection->peerConnection->observer->setOnFrame(OnFrame); -} - -void WebRTCImpl::setOnVideoTrack(const std::string& streamRoomId, std::function OnVideoTrack) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); - connection->peerConnection->observer->setOnVideoTrack(OnVideoTrack); -} - -void WebRTCImpl::setOnRemoveVideoTrack(const std::string& streamRoomId, std::function OnRemoveVideoTrack) { - auto connection = _peerConnectionManager->getConnectionWithSession(streamRoomId, ConnectionType::Subscriber); - connection->peerConnection->observer->setOnRemoveVideoTrack(OnRemoveVideoTrack); -} \ No newline at end of file diff --git a/privfs/src/gateway/RpcGateway.cpp b/privfs/src/gateway/RpcGateway.cpp index 41f0089e..99e7332d 100644 --- a/privfs/src/gateway/RpcGateway.cpp +++ b/privfs/src/gateway/RpcGateway.cpp @@ -73,7 +73,11 @@ RpcGateway::RpcGateway(rpc::AuthorizedConnection::Ptr rpc, std::optionalcall(method, params, settings, token); + try { + return _rpc->call(method, params, settings, token); + } catch (const privmx::rpc::HttpRequestException& e) { + return _rpc->call(method, params, settings, token); + } } throw NotConnectedException(); return Var(); diff --git a/rpc/base/src/AuthorizedConnection.cpp b/rpc/base/src/AuthorizedConnection.cpp index 833c6994..6d6adeb7 100644 --- a/rpc/base/src/AuthorizedConnection.cpp +++ b/rpc/base/src/AuthorizedConnection.cpp @@ -314,9 +314,9 @@ void AuthorizedConnection::authorizeWebsocket() { params->set("addWsChannelId", true); auto wschannel_id = call("authorizeWebSocket", params, {.channel_type = ChannelType::WEBSOCKET}) .extract()->getValue("wsChannelId"); - _wschannel_id = wschannel_id; - LOG_DEBUG("AuthorizedConnection::authorizeWebSocket => notify->add(wschannel_id): ", wschannel_id); - _server_channels->notify->add(wschannel_id, [&, key](const std::string& data){ + _wschannel_id = wschannel_id; + LOG_DEBUG("AuthorizedConnection::authorizeWebSocket => notify->add(wschannel_id): ", wschannel_id); + _server_channels->notify->add(wschannel_id, [&, key](const std::string& data){ string decrypted = crypto::Crypto::aes256CbcHmac256Decrypt(data, key); Pson::Decoder decoder; auto decoded = decoder.decode(decrypted).extract();