Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9890c41
feat: new streamApi for c++ (WIP)
Uriagat Nov 19, 2025
565db24
feat: new streamApi for c++ (WIP) [tracks control when joining]
Uriagat Nov 20, 2025
485d56d
fix: segfault on leaveStreamRoom (part: 1)
Uriagat Nov 21, 2025
f03ec95
feat: update stream [WIP publisher dont remove track from janus]
Uriagat Nov 24, 2025
86bd3fb
feat: track mute + update stream [WIP]
Uriagat Nov 26, 2025
5ebc99e
feat: desktop share
Uriagat Nov 28, 2025
0fcacde
Merge remote-tracking branch 'origin/devel-streams-testing' into deve…
Uriagat Dec 4, 2025
5abc545
fix: streamUpdate
Uriagat Dec 4, 2025
ceb8eab
fix: leaveStreamRoom
Uriagat Dec 4, 2025
2ce707f
fix: preerConnection per trck managment (WIP)
Uriagat Dec 5, 2025
faf8382
debuging: add VideoRenderer on new Track
Uriagat Dec 8, 2025
899b8d7
Merge remote-tracking branch 'origin/devel-streams-testing' into deve…
Uriagat Dec 15, 2025
47ccfc1
fix: missing removeDesktopTrack
Uriagat Dec 15, 2025
d12a692
Merge remote-tracking branch 'origin/devel-streams-testing' into deve…
Uriagat Dec 15, 2025
9922d53
debug: example stream program 3
Uriagat Dec 15, 2025
e5d66dd
fix: PeerConnectionManager closing connection
Uriagat Dec 18, 2025
c0d4a7f
fix: publish stream (not fully tested)
Uriagat Dec 19, 2025
4087eb8
fix: stream empty user list on update
Uriagat Jan 7, 2026
d6830e7
fix: stream programs 3 (debuging)
Uriagat Jan 7, 2026
9f30eeb
Merge remote-tracking branch 'origin/devel-streams-testing' into deve…
Uriagat Jan 8, 2026
ce1de4c
debug: streamApi
Uriagat Jan 8, 2026
575c6f8
feat: OnTrackInterface (WIP)
Uriagat Jan 13, 2026
f522e5f
Merge remote-tracking branch 'origin/devel-streams-testing' into deve…
Uriagat Jan 14, 2026
197bf80
Merge remote-tracking branch 'origin/devel' into devel-streams-cpp-up…
Uriagat Jan 14, 2026
f7e6ab0
feat: OnTrackInterface (WIP)
Uriagat Jan 14, 2026
6633669
fix: PmxPeerConnectionObserver
Uriagat Jan 16, 2026
94064b4
fix: stream_program_3
Uriagat Jan 19, 2026
636c5c2
feat: track mute
Uriagat Jan 20, 2026
b1060aa
fix: StreamKeyManager::KeyTTL
Uriagat Jan 23, 2026
ce55ec8
Merge remote-tracking branch 'origin/devel' into devel-streams-cpp-up…
Uriagat Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions endpoint/programs/stream_testing/streams_program_2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,12 @@ int main(int argc, char** argv) {
auto streamlist = streamApi.listStreams(streamRoomId);
std::vector<stream::StreamSubscription> streamsId;
for(int i = 0; i < streamlist.size(); i++) {
streamsId.push_back(stream::StreamSubscription{streamlist[i].streamId, std::nullopt});
streamsId.push_back(stream::StreamSubscription{streamlist[i].id, std::nullopt});
}
stream::StreamSettings ssettings;
streamApi.subscribeToRemoteStreams(streamRoomId, streamsId, ssettings);

std::this_thread::sleep_for(std::chrono::seconds(120));
streamApi.unpublishStream(streamId_1);
streamApi.unsubscribeFromRemoteStreams(streamRoomId, streamsId);
streamApi.leaveStreamRoom(streamRoomId);
std::this_thread::sleep_for(std::chrono::seconds(60));
std::this_thread::sleep_for(std::chrono::seconds(1));


Expand Down
237 changes: 191 additions & 46 deletions endpoint/programs/stream_testing/streams_program_3_reciver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,168 @@
#include <fstream>
#include <vector>
#include <thread>
#include <memory>
#include <atomic>
#include <privmx/endpoint/core/Exception.hpp>
#include <privmx/endpoint/core/Config.hpp>
#include <privmx/endpoint/core/Connection.hpp>
#include <privmx/endpoint/core/EventQueue.hpp>
#include <privmx/endpoint/event/EventApi.hpp>
#include <privmx/endpoint/stream/StreamApi.hpp>
#include <privmx/endpoint/stream/Events.hpp>
#include <privmx/endpoint/stream/Types.hpp>
#include <privmx/endpoint/stream/webrtc/OnTrackInterface.hpp>
#include <privmx/endpoint/stream/webrtc/Types.hpp>
#include <privmx/utils/PrivmxException.hpp>
#include <privmx/utils/Debug.hpp>
#include <privmx/utils/Logger.hpp>
#include <SDL2/SDL.h>

using namespace std;
using namespace privmx::endpoint;

class RTCVideoRendererImpl {
public:
RTCVideoRendererImpl(const std::string& title) : title("PrivMX Stream - " + title) {}
void OnFrame(int64_t w, int64_t h, std::shared_ptr<privmx::endpoint::stream::Frame> frame, const std::string id) {
if (renderer == NULL) {
window = SDL_CreateWindow(title.c_str(), SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, 768, 432, 0);
renderer = SDL_CreateRenderer(window, -1, SDL_RENDERER_ACCELERATED);
texture = SDL_CreateTexture(renderer, SDL_PIXELFORMAT_RGBA8888, SDL_TEXTUREACCESS_STREAMING, 768, 432);
windowEventsLoop();
}
uint32_t* pixels = new uint32_t[768 * 432];
// PRIVMX_DEBUG("RTCVideoRenderer", "OnFrame", "Frame size: "+std::to_string(w) +"-"+std::to_string(h))
frame->ConvertToRGBA((uint8_t*)pixels, 4, 768, 432);
SDL_UpdateTexture(texture, NULL, pixels, 768 * sizeof(uint32_t));
SDL_RenderClear(renderer); SDL_RenderCopy(renderer, texture, NULL, NULL); SDL_RenderPresent(renderer);
delete pixels;

RTCVideoRendererImpl(
const std::string& title, int width, int height
) :
_title("PrivMX Stream - " + title), _width(width), _height(height),
_window(nullptr), _texture(nullptr), _renderer(nullptr),
_framePixels(std::vector<uint32_t>(_width*_height)), _hasNewFrame(false),
_stop(false), _SDLMainThread(std::thread(&RTCVideoRendererImpl::main, this))
{}

~RTCVideoRendererImpl() {
_stop = true;
if (_SDLMainThread.joinable()) _SDLMainThread.join();
}
void OnFrame(int64_t w, int64_t h, std::shared_ptr<privmx::endpoint::stream::Frame> frame, const std::string& id)
{
std::lock_guard<std::mutex> lock(_frameMutex);
frame->ConvertToRGBA(
reinterpret_cast<uint8_t*>(_framePixels.data()),
4, _width, _height
);

_hasNewFrame = true;
}

private:
void windowEventsLoop() {
eventThreadLoop = std::make_shared<std::thread>([&](){
while(true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
SDL_Event event;
while (SDL_PollEvent(&event)) {
if (event.type == SDL_QUIT) exit(0);
if (event.type == SDL_WINDOWEVENT) {
if (event.window.event == SDL_WINDOWEVENT_CLOSE)
{
exit(1);
}
}
void initializeSDL() {
if (SDL_Init(SDL_INIT_VIDEO | SDL_INIT_TIMER) != 0) {
LOG_FATAL("SDL_Init Error: " , SDL_GetError())
throw "SDL_Error";
}
_window = SDL_CreateWindow(_title.c_str(), SDL_WINDOWPOS_CENTERED, SDL_WINDOWPOS_CENTERED, _width, _height, 0);
if (!_window) {
LOG_FATAL("SDL_CreateWindow Error: " , SDL_GetError())
SDL_Quit();
throw "SDL_Error";
}
_renderer = SDL_CreateRenderer(_window, -1, SDL_RENDERER_ACCELERATED);
if (!_renderer) {
LOG_FATAL("SDL_CreateRenderer Error: " , SDL_GetError())
SDL_DestroyWindow(_window);
SDL_Quit();
throw "SDL_Error";
}
_texture = SDL_CreateTexture(_renderer, SDL_PIXELFORMAT_RGBA8888, SDL_TEXTUREACCESS_STREAMING, _width, _height);
if (!_texture) {
LOG_FATAL("SDL_CreateTexture Error: " , SDL_GetError())
SDL_DestroyRenderer(_renderer);
SDL_DestroyWindow(_window);
SDL_Quit();
throw "SDL_Error";
}
}
void main() {
initializeSDL();
const int frameDelayMs = 1000 / 24; // 24 Frames per second
while (!_stop) {
Uint32 startTicks = SDL_GetTicks64();
SDL_Event event;
while (SDL_PollEvent(&event)) {
if (event.type == SDL_QUIT ||
(event.type == SDL_WINDOWEVENT &&
event.window.event == SDL_WINDOWEVENT_CLOSE)) {
_stop = true;
exit(0);
}
}
});
{
std::lock_guard<std::mutex> lock(_frameMutex);
if (_hasNewFrame) {
SDL_UpdateTexture(
_texture,
nullptr,
_framePixels.data(),
_width * sizeof(uint32_t)
);
_hasNewFrame = false;
}
}
SDL_RenderClear(_renderer);
SDL_RenderCopy(_renderer, _texture, nullptr, nullptr);
SDL_RenderPresent(_renderer);

Uint32 elapsed = SDL_GetTicks64() - startTicks;
if (elapsed < (Uint32)frameDelayMs) SDL_Delay(frameDelayMs - elapsed);
}
if (_texture) SDL_DestroyTexture(_texture);
if (_renderer) SDL_DestroyRenderer(_renderer);
if (_window) SDL_DestroyWindow(_window);
SDL_Quit();
}
SDL_Window* window;
SDL_Texture* texture;
SDL_Renderer* renderer = NULL;
std::string title;
std::shared_ptr<std::thread> eventThreadLoop;

SDL_Window* _window;
SDL_Texture* _texture;
SDL_Renderer* _renderer;

std::string _title;
int _width;
int _height;

std::mutex _frameMutex;
std::vector<uint32_t> _framePixels;
bool _hasNewFrame;

std::atomic_bool _stop;
std::thread _SDLMainThread;
};

static vector<string_view> getParamsList(int argc, char* argv[]) {
vector<string_view> args(argv + 1, argv + argc);
return args;
}

class OnTrackImpl : public stream::OnTrackInterface {
public:
OnTrackImpl() : _renderer(RTCVideoRendererImpl("Remote", 768, 432)) {}
virtual void OnRemoteTrack(stream::Track tack, stream::TrackAction action) override {
if(tack.kind == stream::DataType::AUDIO) {
LOG_INFO("OnRemoteTrack[stream::TrackAction] DataType::AUDIO : ", (action == stream::TrackAction::ADDED ? "ADDED" : "REMOVED"));
if(action == stream::TrackAction::ADDED && !tack.muted) {
tack.updateMute(true);
}
}
if(tack.kind == stream::DataType::VIDEO) {
LOG_INFO("OnRemoteTrack[stream::TrackAction] DataType::VIDEO : ", (action == stream::TrackAction::ADDED ? "ADDED" : "REMOVED"));
}
}
virtual void OnData(std::shared_ptr<stream::Data> data) override {
if(data->type == stream::DataType::VIDEO) {
auto videoData = std::dynamic_pointer_cast<stream::VideoData>(data);
// LOG_INFO("VideoData[w-h]: ", videoData->w, "-", videoData->h);
_renderer.OnFrame(videoData->w, videoData->h, videoData->frameData, "test");
}
if(data->type == stream::DataType::AUDIO) {
auto audioData = std::dynamic_pointer_cast<stream::AudioData>(data);
// LOG_INFO("AudioData[w-h]");
}
}
private:
RTCVideoRendererImpl _renderer;
};

int main(int argc, char** argv) {
auto params = getParamsList(argc, argv);
if(params.size() != 5) {
Expand All @@ -84,25 +185,69 @@ int main(int argc, char** argv) {
);
event::EventApi eventApi = event::EventApi::create(connection);
stream::StreamApi streamApi = stream::StreamApi::create(connection, eventApi);
RTCVideoRendererImpl r = RTCVideoRendererImpl("Remote");
stream::StreamSettings ssettings {
.OnFrame=[&](int64_t w, int64_t h, std::shared_ptr<privmx::endpoint::stream::Frame> frame, const std::string id) {
r.OnFrame(w, h, frame, id);
}
};
auto streamlist = streamApi.listStreams(streamRoomId);
core::EventQueue eventQueue = core::EventQueue::getInstance();
auto onTrack = std::make_shared<OnTrackImpl>();
streamApi.setOnTrackInterface(onTrack);
stream::StreamSettings ssettings {};
streamApi.subscribeFor({
streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_UPDATE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
streamApi.buildSubscriptionQuery(stream::EventType::STREAMROOM_DELETE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
streamApi.buildSubscriptionQuery(stream::EventType::STREAM_PUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
streamApi.buildSubscriptionQuery(stream::EventType::STREAM_UNPUBLISH, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
streamApi.buildSubscriptionQuery(stream::EventType::STREAM_JOIN, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
streamApi.buildSubscriptionQuery(stream::EventType::STREAM_LEAVE, stream::EventSelectorType::STREAMROOM_ID, streamRoomId),
});
std::vector<stream::StreamSubscription> streamsId;
for(int i = 0; i < streamlist.size(); i++) {
std::cout << "streamlist[" << i << "]:" << streamlist[i].streamId << std::endl;
streamsId.push_back(stream::StreamSubscription{streamlist[i].streamId, std::nullopt});
{
auto streamlist = streamApi.listStreams(streamRoomId);
for(auto stream : streamlist) {
std::cout << "stream:" << stream.id << std::endl;
for(auto track : stream.tracks) {
streamsId.push_back(stream::StreamSubscription{stream.id, track.mid});
}
break;
}
}
streamApi.joinStreamRoom(streamRoomId);
streamApi.subscribeToRemoteStreams(streamRoomId, streamsId, ssettings);

auto eventThread = std::thread([&](){
while (true) {
auto eventHolder = eventQueue.waitEvent();
if(core::Events::isLibBreakEvent(eventHolder)) return;
if(stream::Events::isStreamUpdatedEvent(eventHolder)) {
auto streamUpdatedEvent = stream::Events::extractStreamUpdatedEvent(eventHolder).data;
auto streamsModified = streamUpdatedEvent.streamsModified;

std::vector<stream::StreamSubscription> toAddstreamsId;
std::vector<stream::StreamSubscription> toRemovestreamsId;
for(auto stream : streamsModified) {
for(auto track : stream.tracks) {
if(track.after.has_value()) {
if(track.after.value().disabled.has_value()) {
toRemovestreamsId.push_back({stream.streamId, track.after.value().mid});
} else {
toAddstreamsId.push_back({stream.streamId, track.after.value().mid});
}
}

}
}
if(toAddstreamsId.size() > 0 || toRemovestreamsId.size() > 0) {
streamApi.modifyRemoteStreamsSubscriptions(streamRoomId, toAddstreamsId, toRemovestreamsId, ssettings);
}
}
}
});

while (true) {std::this_thread::sleep_for(std::chrono::seconds(5));}
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}

streamApi.unsubscribeFromRemoteStreams(streamRoomId, streamsId);
std::this_thread::sleep_for(std::chrono::seconds(5));
eventQueue.emitBreakEvent();
if(eventThread.joinable()) eventThread.join();
connection.disconnect();

} catch (const core::Exception& e) {
Expand Down
Loading