diff --git a/signalling/streaming-signalling-server.py b/signalling/streaming-signalling-server.py new file mode 100755 index 0000000..7724a9e --- /dev/null +++ b/signalling/streaming-signalling-server.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# +# streaming signalling server +# +# Copyright (C) 2019 Codethink Ltd. +# Copyright (C) 2017 Centricular Ltd. +# +# Author: Nirbheek Chauhan +# Author: Aiden Jeffrey +# + +import argparse +import asyncio +import json +import logging +import os +import random +import sys +import websockets + +from concurrent.futures._base import TimeoutError + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('--addr', default='0.0.0.0', help='Address to listen on') +parser.add_argument('--port', default=8443, type=int, help='Port to listen on') +parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)') + +options = parser.parse_args(sys.argv[1:]) + +ADDR_PORT = (options.addr, options.port) +KEEPALIVE_TIMEOUT = options.keepalive_timeout +MAX_NUM_CLIENTS = 1000 + + +class Peer: + def __init__(self, web_socket, in_session=False): + self.web_socket = web_socket + self.remote_address = web_socket.remote_address + + self.in_session = in_session + + +class SignalError(Exception): + pass + + +clients = dict() +media_server = None + + +async def report_and_log_error(msg, socket=None): + logger.error(msg) + if socket: + await socket.send(msg) + + +async def add_client(client): + while True: + uid = random.randint(0, MAX_NUM_CLIENTS) + if uid not in clients: + break + clients[uid] = client + return uid + + +############### Helper functions ############### + +async def recv_msg_ping(web_socket): + """ + Wait for a message forever, and send a regular ping to prevent bad routers + from closing the connection. + """ + msg = None + while msg is None: + try: + msg = await asyncio.wait_for(web_socket.recv(), KEEPALIVE_TIMEOUT) + except TimeoutError: + print('Sending keepalive ping to {!r} in recv'.format( + web_socket.remote_address)) + await web_socket.ping() + return msg + + +async def remove_peer(uid): + global media_server + if uid in clients: + # Let gstreamer know that this client died + if media_server is not None: + await media_server.web_socket.send( + "UNBIND-SESSION-CLIENT {}".format(uid)) + else: + raise SignalError("No media server to deregister from") + await clients[uid].web_socket.close() + del clients[uid] + print("Cleaned up and disconnected from client {}".format(uid)) + elif uid == None: + if media_server is not None: + await media_server.web_socket.close() + print("Cleaned up and disconnected from media server") + media_server = None + +############### Handler functions ############### + +async def connection_handler(uid, peer, is_server=False): + """ Once the peer is registered, the websocket's messages are bound + to this function + """ + global clients, media_server + + while True: + # Receive command, wait forever if necessary + msg = await recv_msg_ping(peer.web_socket) + print("connection_handler: ", msg) + if is_server: + if msg.startswith("SESSION "): + print("SESSION MESSAGE: {}".format(msg)) + _, client_uid, server_msg = msg.split(maxsplit=2) + try: + client = clients[int(client_uid)] + except (KeyError, ValueError): + print("CLIENTS:", clients, type(client_uid)) + await report_and_log_error( + "ERROR session: unknown client uid msg {}".format(msg)) + else: + if server_msg == "BOUND": + client.in_session = True + elif server_msg == "UNBOUND": + client.in_session = False + else: + await report_and_log_error( + "ERROR Unknown SESSION report from" + " server {}".format(msg)) + else: + print("FWD MESSAGE: {}".format(msg)) + # message should be forwarded to client, the uid of which + # should be embedded in message + try: + msg_data = json.loads(msg) + except json.JSONDecodeError: + await peer.web_socket.send( + "ERROR bad data sent" + " msg {}".format(msg)) + if ("client_uid" not in msg_data) or ( + msg_data["client_uid"] not in clients): + await report_and_log_error( + "ERROR forward unknown client uid" + " msg {}".format(msg)) + else: + client_uid = msg_data["client_uid"] + print("media-server -> {}: {}".format(client_uid, msg)) + await clients[client_uid].web_socket.send(msg) + + else: + # client connection + print("{} -> media-server: {}".format(uid, msg)) + await media_server.web_socket.send(msg) + + +async def register_peer(web_socket): + """ + Exchange hello, register peer + """ + global media_server + + new_peer = Peer(web_socket, False) + msg = await web_socket.recv() + print("register_peer: ", msg) + uid = None + if msg == "REGISTER CLIENT": + # TODO: do we need to restrict clients somehow (i.e. one per address)? + uid = await add_client(new_peer) + # Send back the assined uid + await web_socket.send("ASSIGNED UID {}".format(uid)) + # inform gstreamer of the client + if media_server is not None: + msg = "BIND-SESSION-CLIENT {}".format(uid) + print("BIND {} -> media-server: {}".format(uid, msg)) + await media_server.web_socket.send(msg) + else: + raise SignalError("Registering of clients only possible after media" + " server registered") + print("Registered client {} at {}".format(uid, + web_socket.remote_address)) + elif msg == "REGISTER MEDIA": + if media_server is None: + media_server = new_peer + else: + await web_socket.close(code=1002, reason="already connected to a" + " gstreamer media source") + raise Exception("Multiple media server connections detected {!r}".format( + new_peer.remote_address)) + print("Registered media server at {}".format( + web_socket.remote_address)) + await web_socket.send("REGISTERED") + else: + await web_socket.close(code=1002, reason='invalid protocol') + raise Exception("Invalid hello from {}".format( + new_peer.remote_address)) + + return uid, new_peer + + +async def handler(web_socket, path): + """ + All incoming messages are handled here. @path is unused. + """ + print("Connected to {!r}".format(web_socket.remote_address)) + # TODO: is this a bug in waiting? can't work out how this doesn't swallow + # all the messages + uid, new_peer = await register_peer(web_socket) + try: + await connection_handler(uid, new_peer, is_server=(uid is None)) + except websockets.ConnectionClosed: + print("Connection to peer {!r} closed, exiting handler".format( + web_socket.remote_address)) + finally: + await remove_peer(uid) + + +print("Listening on http://{}:{}".format(*ADDR_PORT)) +# Websocket server +web_socket_d = websockets.serve(handler, *ADDR_PORT, ssl=None, max_queue=16) + +logger = logging.getLogger('webrtc.server') + +logger.setLevel(logging.ERROR) +logger.addHandler(logging.StreamHandler()) + +asyncio.get_event_loop().run_until_complete(web_socket_d) +asyncio.get_event_loop().run_forever() diff --git a/streamer/README.md b/streamer/README.md new file mode 100644 index 0000000..30e5498 --- /dev/null +++ b/streamer/README.md @@ -0,0 +1,23 @@ +Overview +======== + +This streaming example allows you to connect multiple clients to a single +streaming endpoint (or `mountpoint`). + +Building +======== + +To build, just run the [Makefile](gst/Makefile). + +Usage +===== + +To use, serve the files in the `/js` directory (with `python3 -m http.server` +for example), and run the standalone signalling server: +``` +python3 [./streaming-signalling-server.py](../signalling/streaming-signalling-server.py) +``` +Then run the compiled binary in the `gst/` directory: +``` +./streaming-app +``` diff --git a/streamer/gst/Makefile b/streamer/gst/Makefile new file mode 100644 index 0000000..2dafa20 --- /dev/null +++ b/streamer/gst/Makefile @@ -0,0 +1,68 @@ +BINARY_RELEASE = streaming-app +BINARY_DEBUG = $(BINARY_RELEASE)-debug + +INCLUDE = -I include +PKG_CONFIG = glib-2.0 gstreamer-1.0 libsoup-2.4 json-glib-1.0 glib-2.0 gstreamer-sdp-1.0 gstreamer-webrtc-1.0 +VERSION = $(shell git describe --abbrev=8 --dirty --always --tags) + +CFLAGS_COMMON = $(INCLUDE) -Wall -Wextra -Werror -MD -MP -fPIC `pkg-config --cflags $(PKG_CONFIG)` -DVERSION=\"$(VERSION)\" +CFLAGS_RELEASE = -O3 -s -fexpensive-optimizations $(CFLAGS_COMMON) +CFLAGS_DEBUG = -O0 -g $(CFLAGS_COMMON) +LDFLAGS = `pkg-config --libs $(PKG_CONFIG)` + +SRC = $(shell find src -type f) +OBJ_RELEASE = $(patsubst src/%.c, .build/%.o, $(SRC)) +DEP_RELEASE = $(patsubst src/%.c, .build/%.d, $(SRC)) +OBJ_DEBUG = $(patsubst src/%.c, .build/debug/%.o, $(SRC)) +DEP_DEBUG = $(patsubst src/%.c, .build/debug/%.d, $(SRC)) + +PREFIX = /usr/local +BINDIR = $(PREFIX)/bin +CFGDIR = $(PREFIX)/share/ct-camera +DOCSDIR = $(PREFIX)/share/doc/ct-camera/pipelines +SYSTEMDDIR = /lib/systemd/system/ + +CONFIGS = $(shell find -type f -name "*.cfg") + +all : release + +release : $(BINARY_RELEASE) + +debug : $(BINARY_DEBUG) + +.build/%.o: src/%.c + mkdir -p $(basename $@) + $(CC) -c $(CFLAGS_RELEASE) -o $@ $< + +.build/debug/%.o: src/%.c + mkdir -p $(basename $@) + $(CC) -c $(CFLAGS_DEBUG) -o $@ $< + +$(BINARY_RELEASE) : $(OBJ_RELEASE) + $(CC) -o $@ $^ $(LDFLAGS) + +$(BINARY_DEBUG) : $(OBJ_DEBUG) + $(CC) -o $@ $^ $(LDFLAGS) + +clean: + rm -rf .build $(BINARY_RELEASE) $(BINARY_DEBUG) + +install: $(BINARY_RELEASE) + install -d $(DESTDIR)/$(BINDIR) + install $(BINARY_RELEASE) $(DESTDIR)/$(BINDIR) + install -d -m 777 $(DESTDIR)/$(CFGDIR) + install -m 777 $(CONFIGS) $(DESTDIR)/$(CFGDIR) + install -d -m 777 $(DESTDIR)/$(DOCSDIR) + install -m 777 README.md $(DESTDIR)/$(DOCSDIR) + mkdir -p $(DESTDIR)/$(SYSTEMDDIR)/ + install -m 644 ct-camera.service $(DESTDIR)/$(SYSTEMDDIR)/ + ln -nsf test.cfg $(DESTDIR)/$(CFGDIR)/default.cfg + +uninstall: + rm -rf $(addprefix $(BINDIR)/,$(BINARY_RELEASE)) + rm -rf $(addprefix $(CFGDIR)/,$(CONFIGS)) + rm -rf $(DOCSDIR) + +-include $(DEP) $(DEP_DEBUG) + +.PHONY : all release debug clean install uninstall diff --git a/streamer/gst/include/camera-pipeline.h b/streamer/gst/include/camera-pipeline.h new file mode 100644 index 0000000..924ffe1 --- /dev/null +++ b/streamer/gst/include/camera-pipeline.h @@ -0,0 +1,47 @@ +#ifndef __CAMERA_PIPELINE_H__ +#define __CAMERA_PIPELINE_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct camera_pipe_s camera_pipe_t; + +#include "webrtc-mountpoint.h" + +struct camera_pipe_s { + gint camera_id; + + GstElement *pipeline; + GstElement *video_testsrc, *video_convert; + GstElement *queue, *video_encoder; + + GstElement *rtp_payloader; + GstElement *webrtc_queue, *webrtc_tee; + + GstElement *fakesink; + + webrtc_mp_t *webrtc_mp; + + GstElement *source_caps_filter, *encode_caps_filter; + + gboolean playing; +}; + +struct camera_pipe_list_s { + camera_pipe_t **pipeline; + size_t pipeline_count; +}; + +camera_pipe_t* camera_pipe_create (); +void camera_pipe_delete (camera_pipe_t* data); + +gboolean camera_pipe_set_state(camera_pipe_t* data, + GstState state); +#endif diff --git a/streamer/gst/include/webrtc-mountpoint.h b/streamer/gst/include/webrtc-mountpoint.h new file mode 100644 index 0000000..c95bf40 --- /dev/null +++ b/streamer/gst/include/webrtc-mountpoint.h @@ -0,0 +1,44 @@ +#ifndef __WEBRTC_MP_H__ +#define __WEBRTC_MP_H__ + +#include + +#define GST_USE_UNSTABLE_API +#include + +#include + +typedef struct webrtc_mp_s webrtc_mp_t; + +#include "camera-pipeline.h" +#include "webrtc-server.h" + +/* + * Webrtc mountpoints + * each pipeline has a mountpoint, with multiple webrtcbins (one for each + * client connection) + */ +struct webrtc_mp_s { + camera_pipe_t* pipeline_ref; + GstElement** webrtcbins; + GstPad** tee_pads; + GstPad** bin_pads; + webrtc_session_t** session_refs; + size_t bin_count; +}; + +webrtc_mp_t* webrtc_mp_create(camera_pipe_t* pipeline); +void webrtc_mp_delete(webrtc_mp_t* mountpoint); + +gboolean webrtc_mp_add_element (webrtc_mp_t* mountpoint, + webrtc_session_t* session); +GstElement* webrtc_mp_get_element (webrtc_mp_t* mountpoint, + guint client_uid); +gboolean webrtc_mp_remove_element(webrtc_mp_t* mountpoint, + guint client_uid); +webrtc_session_t* webrtc_mp_get_session (webrtc_mp_t* mountpoint, + guint client_uid); + +gchar* get_string_from_json_object(JsonObject* object); + +#endif \ No newline at end of file diff --git a/streamer/gst/include/webrtc-server.h b/streamer/gst/include/webrtc-server.h new file mode 100644 index 0000000..84116e5 --- /dev/null +++ b/streamer/gst/include/webrtc-server.h @@ -0,0 +1,67 @@ +#ifndef __WEBRTC_SERVER_H__ +#define __WEBRTC_SERVER_H__ + +#include +#include + +/* For signalling */ +#include +#include + +#include "camera-pipeline.h" + +#include + +typedef enum { + SERVER_STATE_UNKNOWN = 0, + SERVER_STATE_ERROR = 1, /* generic error */ + SERVER_STATE_CONNECTING = 2, + SERVER_STATE_CONNECTION_ERROR = 3, + SERVER_STATE_CONNECTED = 4, /* Ready to register */ + SERVER_STATE_REGISTERING = 5, + SERVER_STATE_REGISTRATION_ERROR = 6, + SERVER_STATE_REGISTERED = 7, /* Ready to accept client connections */ + SERVER_STATE_CLOSED = 8, /* server connection closed somewhere */ + SERVER_STATE_TYPE_COUNT +} webrtc_server_state_e; + +typedef enum { + CLIENT_CONNECTING = 0, + CLIENT_CONNECTION_ERROR = 1, + CLIENT_CONNECTED = 2, + STREAM_MOUNTED = 3, + STREAM_NEGOTIATING = 4, + STREAM_STARTED = 5, + STREAM_STOPPING = 6, + STREAM_STOPPED = 7, + STREAM_ERROR = 8, + SESSION_STATE_TYPE_COUNT +} webrtc_session_state_e; + +/* + * Session contains data encapsulating a client connection + * mountpoint_id - the camera_id in camera_pipe_t + * webrtcbin_index - index into webrtc_mp_t + */ +typedef struct webrtc_session_s { + guint client_uid; + webrtc_session_state_e state; + gboolean active; + SoupWebsocketConnection* ws_conn_ref; + GstElement* webrtcbin_ref; + GObject* send_channel; + GObject* receive_channel; + GstClockTime join_time; +} webrtc_session_t; + + +void webrtc_websocket_controller_setup ( + camera_pipe_t* pipeline); +void webrtc_websocket_controller_teardown(); + + +gboolean set_session_webrtcbinref( + webrtc_session_t* session, + camera_pipe_t* pipeline); + +#endif diff --git a/streamer/gst/src/camera-pipeline.c b/streamer/gst/src/camera-pipeline.c new file mode 100644 index 0000000..a6ba675 --- /dev/null +++ b/streamer/gst/src/camera-pipeline.c @@ -0,0 +1,182 @@ +#include "camera-pipeline.h" + +#define UNUSED(x) (void)(x) + +#define RAW_STREAM_FORMAT "byte-stream" + +#define FRAME_WIDTH 320 +#define FRAME_HEIGHT 240 +#define FRAMERATE 30 + + +static gboolean set_properties(camera_pipe_t *data); + +camera_pipe_t* camera_pipe_create() +{ + /* Initialise stuff */ + camera_pipe_t *data = (camera_pipe_t*)malloc( + sizeof(camera_pipe_t)); + + if (!data) { + return NULL; + } + + /* Create Gst elements */ + data->video_testsrc = gst_element_factory_make("videotestsrc", + "videotestsrc"); + data->video_convert = gst_element_factory_make("videoconvert", + "videoconvert"); + data->queue = gst_element_factory_make("queue", "queue"); + +#ifdef __aarch64__ + data->video_encoder = gst_element_factory_make("omxh264enc", + "video_encoder"); +#else + data->video_encoder = gst_element_factory_make("vaapih264enc", + "video_encoder"); + if (!data->video_encoder) { + data->video_encoder = gst_element_factory_make("x264enc", + "video_encoder"); + } +#endif + + data->rtp_payloader = gst_element_factory_make("rtph264pay", + "rtp_payloader"); + + + data->webrtc_queue = gst_element_factory_make("queue", "webrtc_queue"); + data->webrtc_tee = gst_element_factory_make("tee", "webrtc_tee"); + data->fakesink = gst_element_factory_make("fakesink", "fakesink"); + + + data->webrtc_mp = webrtc_mp_create(data); + if (!data->webrtc_mp) { + g_error("Failed to create webrtc mountpoint!\n"); + return NULL; + } + + /* Make capsfilters */ + data->encode_caps_filter = gst_element_factory_make("capsfilter", + "encode_caps_filter"); + + data->source_caps_filter = gst_element_factory_make("capsfilter", + "source_caps_filter"); + + data->pipeline = gst_pipeline_new("camera-pipeline"); + + if (!data->pipeline || !data->video_testsrc || !data->video_convert || + !data->queue || !data->video_encoder || !data->rtp_payloader || + !data->webrtc_queue || !data->webrtc_tee || !data->fakesink || + !data->encode_caps_filter || !data->source_caps_filter) { + // TODO: add error goto + g_print("Not all elements could be created!\n"); + return NULL; + } + + if (!set_properties(data)) { + camera_pipe_delete(data); + return NULL; + } + /* Initialise playing state */ + data->playing = FALSE; + + /* Add src->payloader elements */ + gst_bin_add_many(GST_BIN (data->pipeline), data->video_testsrc, + data->source_caps_filter, data->video_convert, + data->queue, data->video_encoder, + data->encode_caps_filter, data->rtp_payloader, + data->webrtc_queue, data->webrtc_tee, data->fakesink, + NULL); + + + /* link elements */ + + if (!gst_element_link_many(data->video_testsrc, data->source_caps_filter, + data->video_convert, data->queue, + data->video_encoder, data->encode_caps_filter, + data->rtp_payloader, data->webrtc_queue, + data->webrtc_tee, data->fakesink, NULL)) { + GST_ERROR("Elements could not be linked!\n"); + camera_pipe_delete(data); + return NULL; + } + + return data; +} + + +void camera_pipe_delete(camera_pipe_t* data) +{ + if (!data) { + return; + } + + gst_element_set_state(data->pipeline, GST_STATE_NULL); + gst_object_unref(data->pipeline); + + webrtc_mp_delete(data->webrtc_mp); + free(data); +} + + +gboolean camera_pipe_set_state(camera_pipe_t* data, GstState state) +{ + /* Start playing the pipeline */ + GST_INFO("Setting pipeline state to %d\n", state); + GstStateChangeReturn ret = gst_element_set_state(data->pipeline, state); + if (ret == GST_STATE_CHANGE_FAILURE) { + GST_ERROR ("Unable to set the pipeline to the state #%d.\n", state); + return FALSE; + } + if ((state == GST_STATE_PLAYING) && (!data->playing)) { + GST_INFO("Pipeline -> playing state\n"); + data->playing = TRUE; + } + else { + GST_INFO("Pipeline -> stopped state\n"); + data->playing = FALSE; + } + return TRUE; +} + + +static gboolean set_properties(camera_pipe_t *data) +{ + GstCaps *encode_caps, *source_caps; + + GString *encode_caps_str = g_string_new(""); + GString *source_caps_str = g_string_new(""); + + g_string_printf(encode_caps_str, + "video/x-h264, stream-format=(string)%s, " + "profile=baseline", + RAW_STREAM_FORMAT); + + g_string_printf(source_caps_str, + "video/x-raw, width=(int)%d, " + "height=(int)%d, format=(string)I420, " + "framerate=(fraction)%d/1", + FRAME_WIDTH, FRAME_HEIGHT, + FRAMERATE); + + encode_caps = gst_caps_from_string(encode_caps_str->str); + g_string_free(encode_caps_str, TRUE); + + source_caps = gst_caps_from_string(source_caps_str->str); + g_string_free(source_caps_str, TRUE); + + if (!encode_caps || !source_caps) { + GST_ERROR("Unable to create caps!\n"); + return FALSE; + } + + /* Set element properties */ + g_object_set(data->encode_caps_filter, "caps", encode_caps, NULL); + g_object_set(data->source_caps_filter, "caps", source_caps, NULL); + + g_object_set(data->rtp_payloader, "config-interval", 10, "pt", 96, NULL); + + g_object_set(data->video_testsrc, "is-live", TRUE, NULL); + + return TRUE; +} \ No newline at end of file diff --git a/streamer/gst/src/main.c b/streamer/gst/src/main.c new file mode 100644 index 0000000..951788e --- /dev/null +++ b/streamer/gst/src/main.c @@ -0,0 +1,74 @@ +#include +#include +#include +#include + +#include +#include +#include + +#include "camera-pipeline.h" +#include "webrtc-server.h" + +typedef struct custom_data_s { + camera_pipe_t *pipeline; + GMainLoop *main_loop; +} custom_data_t; + + +gboolean exit_sighandler(GMainLoop *main_loop) +{ + g_info("Caught signal, stopping mainloop\n"); + g_main_loop_quit(main_loop); + return TRUE; +} + + +static void custom_data_cleanup(custom_data_t *data) +{ + if (data->main_loop) { + g_main_loop_unref(data->main_loop); + } + + camera_pipe_delete(data->pipeline); + + webrtc_websocket_controller_teardown(); +} + + +int main(int argc, char *argv[]) +{ + custom_data_t data; + + /* Initialise stuff */ + gst_init(&argc, &argv); + memset (&data, 0, sizeof(data)); + + data.pipeline = camera_pipe_create(); + + webrtc_websocket_controller_setup(data.pipeline); + + if (!data.pipeline) { + custom_data_cleanup(&data); + } + + /* Create a GLib Main Loop and set up interupt handlers */ + data.main_loop = g_main_loop_new(NULL, FALSE); + + g_unix_signal_add(SIGINT , (void*)exit_sighandler, data.main_loop); + g_unix_signal_add(SIGTERM, (void*)exit_sighandler, data.main_loop); + + /* Play the pipeline */ + if (!camera_pipe_set_state(data.pipeline, GST_STATE_PLAYING)) { + custom_data_cleanup(&data); + return -1; + } + + /* Run main loop */ + g_main_loop_run(data.main_loop); + + /* Free resources */ + custom_data_cleanup(&data); + return 0; + +} \ No newline at end of file diff --git a/streamer/gst/src/webrtc-mountpoint.c b/streamer/gst/src/webrtc-mountpoint.c new file mode 100644 index 0000000..086cea1 --- /dev/null +++ b/streamer/gst/src/webrtc-mountpoint.c @@ -0,0 +1,479 @@ +/* + * Mountpoint object for webrtc clients (browsers) to connect to. + * One of these per pipeline. + */ +#include "webrtc-mountpoint.h" +#include "webrtc-server.h" + +#define UNUSED(x) (void)(x) + +#define STUN_SERVER "stun://stun.l.google.com:19302" + +gchar* get_string_from_json_object(JsonObject* object) +{ + JsonNode *root; + JsonGenerator *generator; + gchar *text; + + /* Make it the root node */ + root = json_node_init_object(json_node_alloc(), object); + generator = json_generator_new(); + + json_generator_set_root(generator, root); + text = json_generator_to_data(generator, NULL); + + /* Release everything */ + g_object_unref(generator); + json_node_free(root); + return text; +} + + +static void send_ice_candidate_message( + GstElement* webrtc, + guint mlineindex, + gchar* candidate, + void* userdata) +{ + webrtc_session_t* session = (webrtc_session_t*)userdata; + UNUSED(webrtc); + gchar *text; + JsonObject *ice, *msg; + + if (session->state < STREAM_NEGOTIATING) { + g_printerr("Can't send ICE, not in call %u\n", session->client_uid); + return; + } + + ice = json_object_new (); + json_object_set_string_member(ice, "candidate", candidate); + json_object_set_int_member (ice, "sdpMLineIndex", mlineindex); + + msg = json_object_new (); + json_object_set_object_member (msg, "ice", ice); + json_object_set_int_member (msg, "client_uid", session->client_uid); + text = get_string_from_json_object(msg); + json_object_unref (msg); + + if (soup_websocket_connection_get_state(session->ws_conn_ref) != + SOUP_WEBSOCKET_STATE_OPEN) { + g_printerr("No websocket connection for client %u\n", + session->client_uid); + return; + } + + soup_websocket_connection_send_text(session->ws_conn_ref, text); + g_free (text); +} + + +static void send_sdp_offer( + GstWebRTCSessionDescription* offer, + void* userdata) +{ + webrtc_session_t* session = (webrtc_session_t*)userdata; + gchar *text, *response_text; + JsonObject *msg, *sdp; + + if (session->state < STREAM_NEGOTIATING) { + g_printerr("Can't send offer, not in call %u\n", session->client_uid); + return; + } + + text = gst_sdp_message_as_text(offer->sdp); + + sdp = json_object_new (); + json_object_set_string_member(sdp, "type", "offer"); + json_object_set_string_member(sdp, "sdp", text); + g_free (text); + + msg = json_object_new (); + json_object_set_object_member(msg, "sdp", sdp); + json_object_set_int_member (msg, "client_uid", session->client_uid); + + response_text = get_string_from_json_object(msg); + json_object_unref (msg); + + soup_websocket_connection_send_text(session->ws_conn_ref, response_text); + g_free (response_text); +} + + +/* Offer created by our pipeline, to be sent to the peer */ +static void on_offer_created( + GstPromise* promise, + void* userdata) +{ + webrtc_session_t* session = (webrtc_session_t*)userdata; + GstWebRTCSessionDescription *offer = NULL; + const GstStructure *reply; + + if (session->state != STREAM_NEGOTIATING) { + g_printerr("Creating offer for session not in negotiation\n"); + return; + } + + if (!session->webrtcbin_ref) { + g_printerr("No webrtcbin associated with session\n"); + return; + } + + if (gst_promise_wait(promise) != GST_PROMISE_RESULT_REPLIED) { + g_printerr("No reply to promise\n"); + return; + } + reply = gst_promise_get_reply(promise); + gst_structure_get(reply, "offer", + GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL); + gst_promise_unref(promise); + + promise = gst_promise_new(); + + g_signal_emit_by_name(session->webrtcbin_ref, "set-local-description", + offer, promise); + gst_promise_interrupt(promise); + gst_promise_unref (promise); + + /* Send offer to peer */ + send_sdp_offer(offer, session); + gst_webrtc_session_description_free(offer); +} + + +static void on_negotiation_needed( + GstElement* webrtcbin, + void* userdata) +{ + webrtc_session_t* session = (webrtc_session_t*)userdata; + GstPromise *promise; + + session->state = STREAM_NEGOTIATING; + promise = gst_promise_new_with_change_func( + on_offer_created, userdata, NULL); + g_signal_emit_by_name(webrtcbin, "create-offer", NULL, promise); +} + + +webrtc_mp_t* webrtc_mp_create(camera_pipe_t* pipeline) +{ + /* Initialise stuff */ + webrtc_mp_t *mountpoint = (webrtc_mp_t*)malloc( + sizeof(webrtc_mp_t)); + + if (!mountpoint) { + return NULL; + } + + mountpoint->webrtcbins = NULL; + mountpoint->tee_pads = NULL; + mountpoint->bin_pads = NULL; + mountpoint->session_refs = NULL; + + mountpoint->pipeline_ref = pipeline; + mountpoint->bin_count = 0; + + return mountpoint; +} + + +void webrtc_mp_delete(webrtc_mp_t* mountpoint) +{ + if (!mountpoint) { + return; + } + for (size_t i = 0; i < mountpoint->bin_count; i++) { + gst_object_unref(mountpoint->tee_pads[i]); + gst_object_unref(mountpoint->bin_pads[i]); + } + free(mountpoint); +} + + +static gboolean _webrtc_mp_reallocate(webrtc_mp_t* mountpoint) +{ + GstElement** webrtcbins = (GstElement**)realloc( + mountpoint->webrtcbins, + sizeof(GstElement*) * mountpoint->bin_count); + + webrtc_session_t** session_refs = (webrtc_session_t**)realloc( + mountpoint->session_refs, + sizeof(webrtc_session_t*) * mountpoint->bin_count); + + GstPad** tee_pads = (GstPad**)realloc( + mountpoint->tee_pads, + sizeof(GstPad*) * mountpoint->bin_count); + + GstPad** bin_pads = (GstPad**)realloc( + mountpoint->bin_pads, + sizeof(GstPad*) * mountpoint->bin_count); + + if ((mountpoint->bin_count != 0) && + (!webrtcbins || !session_refs || !tee_pads || !bin_pads)) { + g_printerr("ERROR: Failure allocating memory in mountpoint " + "(%p, %p, %p, %p)\n", + webrtcbins, session_refs, tee_pads, bin_pads); + free(webrtcbins); + free(session_refs); + free(tee_pads); + free(bin_pads); + return FALSE; + } + else { + mountpoint->webrtcbins = webrtcbins; + mountpoint->session_refs = session_refs; + mountpoint->tee_pads = tee_pads; + mountpoint->bin_pads = bin_pads; + return TRUE; + } +} + + +gboolean webrtc_mp_add_element( + webrtc_mp_t* mountpoint, + webrtc_session_t* session) +{ + if (!mountpoint) { + return FALSE; + } + if (!session) { + return FALSE; + } + gchar *name; + GstElement *webrtcbin; + GstPad *new_tee_pad, *webrtcbin_pad; + /* Check that this mountpoint doesn't already have this client connected */ + for (size_t i = 0; i < mountpoint->bin_count; i++) { + if (session->client_uid == mountpoint->session_refs[i]->client_uid) { + g_printerr("ERROR: Client %u already connected to mountpoint\n", + session->client_uid); + return FALSE; + } + } + + /* Create webrtcbin element */ + name = g_strdup_printf("webrtcbin_%u", session->client_uid); + webrtcbin = gst_element_factory_make("webrtcbin", name); + g_free(name); + + if (!webrtcbin) { + g_printerr("ERROR: Unable to create webrtcbin_%u\n", + session->client_uid); + return FALSE; + } + + if (!gst_bin_add(GST_BIN(mountpoint->pipeline_ref->pipeline), webrtcbin)) { + g_printerr("ERROR: Adding webrtcbin_%u to pipeline\n", + session->client_uid); + gst_object_unref(webrtcbin); + return FALSE; + } + + /* Get pads for linking */ + new_tee_pad = gst_element_get_request_pad( + mountpoint->pipeline_ref->webrtc_tee, "src_%u"); + webrtcbin_pad = gst_element_get_request_pad(webrtcbin, "sink_%u"); + + if (!new_tee_pad || !webrtcbin_pad) { + g_printerr( + "ERROR: Unable to get request pads for webrtcbin_%u linkage.\n", + session->client_uid); + return FALSE; + } + + /* Link new webrtcbin to webrtc_tee */ + GstPadLinkReturn pad_ret = gst_pad_link(new_tee_pad, webrtcbin_pad); + if (pad_ret != GST_PAD_LINK_OK) { + gst_object_unref(new_tee_pad); + gst_object_unref(webrtcbin_pad); + g_printerr("ERROR: Unable to link to new webrtcbin_%u: " + "GstPadLinkReturn: %d\n", + session->client_uid, pad_ret); + gst_debug_set_threshold_from_string ("*:2", TRUE); + return FALSE; + } + + GstStateChangeReturn ret; + GstState webrtc_target_state; + + if (mountpoint->pipeline_ref->playing) { + webrtc_target_state = GST_STATE_PLAYING; + } + else { + webrtc_target_state = GST_STATE_READY; + /* set state of pipeline to READY */ + ret = gst_element_set_state( + mountpoint->pipeline_ref->pipeline, GST_STATE_READY); + if (ret == GST_STATE_CHANGE_FAILURE) { + // gst_object_unref(webrtcbin); + g_printerr("ERROR: Unable to set the pipeline to READY state.\n"); + return FALSE; + } + } + /* set state of new webrtcbin element to match pipeline */ + ret = gst_element_set_state( + webrtcbin, webrtc_target_state); + if (ret == GST_STATE_CHANGE_FAILURE) { + // gst_object_unref(webrtcbin); + g_printerr( + "ERROR: Unable to set the webrtcbin_%u to playing state.\n", + session->client_uid); + return FALSE; + } + + /* Add reference to session */ + session->webrtcbin_ref = webrtcbin; + + /* This is the gstwebrtc entry point where we create the offer and so on. It + * will be called when the pipeline goes to PLAYING. */ + g_signal_connect(webrtcbin, "on-negotiation-needed", + G_CALLBACK (on_negotiation_needed), (void*)session); + /* We need to transmit this ICE candidate to the browser via the websockets + * signalling server. Incoming ice candidates from the browser need to be + * added by us too, see on_server_message() */ + g_signal_connect(webrtcbin, "on-ice-candidate", + G_CALLBACK (send_ice_candidate_message), (void*)session); + + /* Set properties */ + g_object_set(webrtcbin, "stun-server", STUN_SERVER, NULL); + + /* The order should be: + * - add new elements to pipeline + * - link new elements (but not yet to tee) + * - set state of elements to PLAYING (assuming the pipeline is in + * PLAYING), going from sink towards the queue. + * - lastly, link the queue to the tee. + */ + + /* Update mountpoint with new element and session ref */ + mountpoint->bin_count++; + if (!_webrtc_mp_reallocate(mountpoint)) { + g_printerr("ERROR: Failure allocating memory in mountpoint\n"); + mountpoint->bin_count--; + return FALSE; + } + mountpoint->webrtcbins [mountpoint->bin_count - 1] = webrtcbin; + mountpoint->session_refs[mountpoint->bin_count - 1] = session; + mountpoint->tee_pads [mountpoint->bin_count - 1] = new_tee_pad; + mountpoint->bin_pads [mountpoint->bin_count - 1] = webrtcbin_pad; + + /* set state of pipeline to PLAYING */ + ret = gst_element_set_state( + mountpoint->pipeline_ref->pipeline, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) { + // gst_object_unref(webrtcbin); + g_printerr("ERROR: Unable to set the pipeline to PLAYING state.\n"); + return FALSE; + } + + return TRUE; +} + + +static gint _webrtc_mp_get_index( + webrtc_mp_t* mountpoint, + guint client_uid) +{ + gint index = -1; + for (gint i = 0; i < (gint)(mountpoint->bin_count); i++) { + if (mountpoint->session_refs[i]->client_uid == client_uid) { + index = i; + break; + } + } + return index; +} + + +GstElement* webrtc_mp_get_element( + webrtc_mp_t* mountpoint, + guint client_uid) +{ + if (!mountpoint) { + return NULL; + } + gint index = _webrtc_mp_get_index(mountpoint, client_uid); + if (index == -1) { + return NULL; + } + else { + return mountpoint->webrtcbins[index]; + } +} + + +gboolean webrtc_mp_remove_element( + webrtc_mp_t* mountpoint, + guint client_uid) +{ + if (!mountpoint) { + return FALSE; + } + gint index = _webrtc_mp_get_index(mountpoint, client_uid); + if (index == -1) { + return FALSE; + } + /* Unlink webrtcbin element */ + gst_pad_unlink (mountpoint->tee_pads[index], + mountpoint->bin_pads[index]); + gst_element_remove_pad(mountpoint->pipeline_ref->webrtc_tee, + mountpoint->tee_pads[index]); + + /* Set state to NULL*/ + GstStateChangeReturn ret = gst_element_set_state( + mountpoint->webrtcbins[index], GST_STATE_NULL); + + if (ret == GST_STATE_CHANGE_FAILURE) { + // gst_object_unref(webrtcbin); + g_printerr( + "WARNING: Unable to set the webrtcbin_%u to NULL state.\n", + client_uid); + return FALSE; + } + + if (!gst_bin_remove(GST_BIN (mountpoint->pipeline_ref->pipeline), + mountpoint->webrtcbins[index])) { + g_printerr( + "WARNING: Unable to remove the webrtcbin_%u from the pipeline.\n", + client_uid); + return FALSE; + } + + /* free memory allocated for bin and pads */ + gst_object_unref(mountpoint->tee_pads [index]); + gst_object_unref(mountpoint->bin_pads [index]); + + /* Loop through rest of mountpoint and move everything back one */ + for (size_t i = index; i < (mountpoint->bin_count - 1); i++) { + mountpoint->webrtcbins [i] = mountpoint->webrtcbins [i + 1]; + mountpoint->session_refs[i] = mountpoint->session_refs[i + 1]; + mountpoint->tee_pads [i] = mountpoint->tee_pads [i + 1]; + mountpoint->bin_pads [i] = mountpoint->bin_pads [i + 1]; + } + + /* reallocate memory */ + mountpoint->bin_count--; + if (!_webrtc_mp_reallocate(mountpoint)) { + g_printerr("ERROR: Failure reallocating memory in mountpoint\n"); + mountpoint->bin_count++; + return FALSE; + } + + return TRUE; +} + + +webrtc_session_t* webrtc_mp_get_session( + webrtc_mp_t* mountpoint, + guint client_uid) +{ + if (!mountpoint) { + return NULL; + } + gint index = _webrtc_mp_get_index(mountpoint, client_uid); + if (index == -1) { + return NULL; + } + else { + return mountpoint->session_refs[index]; + } +} \ No newline at end of file diff --git a/streamer/gst/src/webrtc-server.c b/streamer/gst/src/webrtc-server.c new file mode 100644 index 0000000..f08489e --- /dev/null +++ b/streamer/gst/src/webrtc-server.c @@ -0,0 +1,683 @@ +/* + * Server for negotiating and streaming a webrtc stream + * with a browser JS app. + * + * Author: Nirbheek Chauhan + * Author: Aiden Jeffrey + */ +#include "webrtc-mountpoint.h" +#include "webrtc-server.h" + +#define UNUSED(x) (void)(x) + +#define MAX_WEBRTC_SESSIONS 1000 + +static SoupWebsocketConnection *ws_conn = NULL; +static webrtc_server_state_e server_state = SERVER_STATE_UNKNOWN; + +static const gchar *server_url = "ws://localhost:8443"; +static webrtc_session_t sessions[MAX_WEBRTC_SESSIONS] = {0}; + + +static void log_error( + const gchar* msg, + webrtc_server_state_e desired_server_state, + webrtc_session_state_e session_state) +{ + if (msg) { + g_printerr("%s, server-state: %d, session_state: %d\n", + msg, desired_server_state, session_state); + } + /* Don't want a server in error continuing */ + if ((desired_server_state == SERVER_STATE_ERROR) || + (desired_server_state == SERVER_STATE_CONNECTION_ERROR) || + (desired_server_state == SERVER_STATE_REGISTRATION_ERROR)) { + server_state = SERVER_STATE_UNKNOWN; + } + else { + server_state = desired_server_state; + } +} + + +static gboolean register_with_server(void) +{ + gchar *hello; + + if (soup_websocket_connection_get_state(ws_conn) != + SOUP_WEBSOCKET_STATE_OPEN) { + return FALSE; + } + + g_print("Registering with signalling server\n"); + server_state = SERVER_STATE_REGISTERING; + + /* Register with the server with a random integer id. Reply will be received + * by on_server_message() */ + hello = g_strdup_printf("REGISTER MEDIA"); + soup_websocket_connection_send_text(ws_conn, hello); + g_free(hello); + + return TRUE; +} + + +static gint get_client_session_index(guint uid) +{ + gint i = -1; + gboolean found_uid = FALSE; + for (i = 0; i < MAX_WEBRTC_SESSIONS; i++) { + if ((sessions[i].client_uid == uid) && (sessions[i].active)) { + found_uid = TRUE; + break; + } + } + if (!found_uid) { + return -1; + } + else { + return i; + } +} + + +static gint bind_client_session(guint uid) +{ + gint i = -1; + gboolean found_space = FALSE; + for (i = 0; i < MAX_WEBRTC_SESSIONS; i++) { + if (!sessions[i].active) { + sessions[i].client_uid = uid; + sessions[i].state = CLIENT_CONNECTING; + sessions[i].active = TRUE; + found_space = TRUE; + break; + } + } + if (!found_space) { + return -1; + } + else { + return i; + } +} + + +static gint unbind_client_session(guint uid) +{ + gint session_index = get_client_session_index(uid); + if (session_index != -1) { + sessions[session_index].active = FALSE; + } + return session_index; +} + + +/* + * Extracts webrtcbin from webrtc_mp_t if exists + */ +gboolean set_session_webrtcbinref( + webrtc_session_t* session, + camera_pipe_t* pipeline) +{ + if (pipeline == NULL) { + return FALSE; + } + + GstElement* webrtcbin = webrtc_mp_get_element( + pipeline->webrtc_mp, session->client_uid); + + if (webrtcbin == NULL) { + gchar* msg = g_strdup_printf( + "ERROR: No webrtcbin element for client_uid %u", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return FALSE; + } + else { + session->webrtcbin_ref = webrtcbin; + return TRUE; + } +} + + +/* + * Looks for valid server messages and processes them + * returns -1 if error, 0 if no server message found, 1 if server message + * found and processed. + */ +static gint process_server_messages( + gchar* text, + camera_pipe_t* pipeline, + SoupWebsocketConnection* ws_conn) +{ + gint client_uid = -1; + webrtc_session_t *session; + gint session_index; + + if (pipeline == NULL) { + return -1; + } + + if (g_strcmp0(text, "REGISTERED") == 0) { + /* Server has accepted our registration, we are ready to send commands */ + if (server_state != SERVER_STATE_REGISTERING) { + log_error ("ERROR: Received REGISTERED when " + "not registering", SERVER_STATE_ERROR, 0); + return -1; + } + server_state = SERVER_STATE_REGISTERED; + g_print("Registered with server\n"); + } else if (sscanf(text, "BIND-SESSION-CLIENT %d", &client_uid) == 1) { + /* Register client with webrtc server */ + /* check that this client doesn't already have a session */ + if (get_client_session_index((guint)client_uid) != -1) { + gchar* msg = g_strdup_printf( + "ERROR: client %d already in session", client_uid); + log_error(msg, server_state, CLIENT_CONNECTION_ERROR); + g_free (msg); + return -1; + } + /* no current session, so make one */ + if ((session_index = bind_client_session((guint)client_uid)) == -1) { + gchar* msg = g_strdup_printf( + "ERROR: no space to register %d client session", client_uid); + log_error(msg, server_state, CLIENT_CONNECTION_ERROR); + g_free (msg); + return -1; + } + session = &(sessions[session_index]); + if (session->state != CLIENT_CONNECTING) { + log_error("ERROR: Received BIND-SESSION-CLIENT when not connecting", + server_state, CLIENT_CONNECTION_ERROR); + return -1; + } + session->state = CLIENT_CONNECTED; + /* Inform signalling server that bind was successful */ + gchar* ret_text = g_strdup_printf( + "SESSION %u BOUND", session->client_uid); + + soup_websocket_connection_send_text(ws_conn, ret_text); + g_free (ret_text); + } else if (sscanf(text, "UNBIND-SESSION-CLIENT %d", &client_uid) == 1) { + /* De-register client with webrtc server */ + if ((session_index = unbind_client_session((guint)client_uid)) == -1) { + gchar* msg = g_strdup_printf( + "ERROR: no client session %d", client_uid); + log_error(msg, server_state, CLIENT_CONNECTION_ERROR); + g_free (msg); + return -1; + } + /* Destroy any webrtcbins associated with this session */ + session = &(sessions[session_index]); + + /* Remove webrtcbin element if one exists */ + if (!webrtc_mp_remove_element(pipeline->webrtc_mp, + session->client_uid)) { + gchar* msg = g_strdup_printf( + "WARNING: Problem removing client_uid %u", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + } + + /* Inform signalling server that unbind was successful */ + gchar* ret_text = g_strdup_printf( + "SESSION %u UNBOUND", session->client_uid); + + soup_websocket_connection_send_text(ws_conn, ret_text); + g_free (ret_text); + + } else if (g_str_has_prefix (text, "ERROR")) { + /* Handle errors */ + switch (server_state) { + case SERVER_STATE_CONNECTING: { + server_state = SERVER_STATE_CONNECTION_ERROR; + break; + } + case SERVER_STATE_REGISTERING: { + server_state = SERVER_STATE_REGISTRATION_ERROR; + break; + } + default: { + break; + } + } + log_error(text, 0, 0); + /* although we received an error, the function itself didn't error */ + return 1; + } + else { + /* No server messages found */ + return 0; + } + return 1; +} + +/* + * Mirrors the json message back to the sender with success = FALSE and some + * error info. + */ +static void _return_json_failure(SoupWebsocketConnection* ws_conn, + JsonObject* json_return, + gchar* msg) +{ + json_object_set_boolean_member( + json_return, "success", FALSE); + json_object_set_string_member( + json_return, "return-message", msg); + + gchar* text = get_string_from_json_object(json_return); + json_object_unref (json_return); + + soup_websocket_connection_send_text(ws_conn, text); + g_free (text); +} + + +/* + * Looks for valid json messages forwarded direct from client and processes + * them returns -1 if error, 0 if no server message found, 1 if json + * message found and processed. + */ +static gint process_json_messages( + gchar* text, + camera_pipe_t* pipeline, + SoupWebsocketConnection* ws_conn) +{ + JsonNode *root; + JsonObject *object, *child, *json_return;; + gchar *return_text; + + JsonParser *parser = json_parser_new (); + + if (!json_parser_load_from_data (parser, text, -1, NULL)) { + g_printerr ("Unknown message '%s', ignoring", text); + g_object_unref (parser); + return -1; + } + + root = json_parser_get_root (parser); + if (!JSON_NODE_HOLDS_OBJECT (root)) { + g_printerr ("Unknown json message '%s', ignoring", text); + g_object_unref (parser); + return -1; + } + object = json_node_get_object (root); + + gint client_uid = -1; + webrtc_session_t* session; + gint session_index; + + /* Get client_uid from json message */ + if (json_object_has_member(object, "client_uid")) { + client_uid = json_object_get_int_member(object, "client_uid"); + if ((session_index = get_client_session_index(client_uid)) != -1) { + session = &(sessions[session_index]); + } + else { + gchar* msg = g_strdup_printf( + "ERROR: trying to access non-existent client session %d", + client_uid); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return -1; + } + } + else { + log_error("ERROR: json message received without client_uid", + server_state, STREAM_ERROR); + return -1; + } + + /* Check type of JSON message */ + /* commands are of the form {command: {type: foo, data: bar} */ + if (json_object_has_member(object, "command")) { + const gchar* cmd_type; + child = json_object_get_object_member(object, "command"); + + if (!json_object_has_member(child, "type")) { + gchar* msg = "ERROR: received command without 'type'"; + log_error(msg, server_state, STREAM_ERROR); + + _return_json_failure(ws_conn, object, msg); + return -1; + } + + cmd_type = json_object_get_string_member(child, "type"); + if (g_strcmp0(cmd_type, "connect-to-mountpoint") == 0) { + /* Add session object to sessions and update mountpoint */ + if (session->state > CLIENT_CONNECTED) { + gchar* msg = g_strdup_printf( + "ERROR: client %u is already connected to", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + _return_json_failure(ws_conn, object, msg); + g_free (msg); + return -1; + } + if (!pipeline->playing) { + gchar* msg = g_strdup_printf( + "ERROR: mountpoint is not playing"); + log_error(msg, server_state, STREAM_ERROR); + _return_json_failure(ws_conn, object, msg); + g_free (msg); + return -1; + } + + /* Update session and create webrtcbin element */ + session->state = STREAM_MOUNTED; + + /* Add element (webrtcbin) to mountpoint for this client session. + * This element will become active when the element sees the + * `on-negotiation-needed` signal (either when pipeline -> PLAYING, + * or we put the signal on manually) + */ + if (!webrtc_mp_add_element(pipeline->webrtc_mp, session)) { + gchar* msg = g_strdup_printf( + "ERROR: Adding webrtcbin element to " + "mountpoint for client %u", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + _return_json_failure(ws_conn, object, msg); + g_free (msg); + return -1; + } + + /* Return success message */ + json_return = object; + json_object_set_boolean_member( + json_return, "success", TRUE); + + return_text = get_string_from_json_object(json_return); + + soup_websocket_connection_send_text(ws_conn, return_text); + g_free (return_text); + } + else if (g_strcmp0(cmd_type, "disconnect-mountpoint") == 0) { + if (session->state <= CLIENT_CONNECTED) { + gchar* msg = g_strdup_printf( + "ERROR: client %u is not currently connected to a stream", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + _return_json_failure(ws_conn, object, msg); + g_free (msg); + return -1; + } + /* Remove webrtcbin in session->mountpoint assigned to this client */ + if (!webrtc_mp_remove_element(pipeline->webrtc_mp, + (guint)client_uid)) { + gchar* msg = g_strdup_printf( + "ERROR: Removing webrtcbin element from " + "mountpoint for client %u", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + _return_json_failure(ws_conn, object, msg); + g_free (msg); + return -1; + } + /* Update session */ + session->state = CLIENT_CONNECTED; + + /* Return success message */ + json_return = object; + json_object_set_boolean_member( + json_return, "success", TRUE); + + return_text = get_string_from_json_object(json_return); + + soup_websocket_connection_send_text(ws_conn, return_text); + g_free (return_text); + } + else { + gchar* msg = g_strdup_printf( + "ERROR: unknown command type %s ", cmd_type); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return -1; + } + } + else if (json_object_has_member(object, "sdp")) { + int ret; + GstSDPMessage *sdp; + const gchar *sdp_text, *sdptype; + GstWebRTCSessionDescription *answer; + + if (session->state != STREAM_NEGOTIATING) { + gchar* msg = g_strdup_printf( + "ERROR: trying to negotiate stream for session %d ", + client_uid); + log_error(msg, server_state, session->state); + g_free (msg); + return -1; + } + + child = json_object_get_object_member(object, "sdp"); + + if (!json_object_has_member(child, "type")) { + log_error("ERROR: received SDP without 'type'", + server_state, STREAM_ERROR); + return -1; + } + + sdptype = json_object_get_string_member(child, "type"); + if (g_strcmp0(sdptype, "answer") != 0) { + log_error("ERROR: SDP message not of `answer` type", + server_state, STREAM_ERROR); + return -1; + } + sdp_text = json_object_get_string_member(child, "sdp"); + + g_print("Received SDP answer:\n%s\n", sdp_text); + + ret = gst_sdp_message_new(&sdp); + g_assert_cmphex (ret, ==, GST_SDP_OK); + + ret = gst_sdp_message_parse_buffer((guint8*) sdp_text, + strlen (sdp_text), sdp); + if (ret != GST_SDP_OK) { + gchar* msg = g_strdup_printf( + "ERROR: parsing SDP message %s", sdp_text); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return -1; + } + + answer = gst_webrtc_session_description_new( + GST_WEBRTC_SDP_TYPE_ANSWER, sdp); + if (answer == NULL) { + log_error("ERROR: NULL SDP answer", + server_state, STREAM_ERROR); + return -1; + } + + if (!session->webrtcbin_ref) { + set_session_webrtcbinref(session, pipeline); + } + + if (session->webrtcbin_ref == NULL) { + gchar* msg = g_strdup_printf( + "ERROR: No webrtcbin found for session %u", + session->client_uid); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return -1; + } + + /* Set remote description on our pipeline */ + GstPromise *promise = gst_promise_new(); + g_signal_emit_by_name( + session->webrtcbin_ref, "set-remote-description", answer, promise); + gst_promise_interrupt(promise); + gst_promise_unref (promise); + + session->state = STREAM_STARTED; + + } + else if (json_object_has_member(object, "ice")) { + const gchar *candidate; + gint sdpmlineindex; + + child = json_object_get_object_member (object, "ice"); + candidate = json_object_get_string_member (child, "candidate"); + sdpmlineindex = json_object_get_int_member(child, "sdpMLineIndex"); + + if (!session->webrtcbin_ref) { + set_session_webrtcbinref(session, pipeline); + } + + if (session->webrtcbin_ref == NULL) { + log_error("ERROR: No webrtcbin found for session", + server_state, STREAM_ERROR); + return -1; + } + + /* Add ice candidate sent by remote peer */ + g_signal_emit_by_name(session->webrtcbin_ref, "add-ice-candidate", + sdpmlineindex, candidate); + } + else { + gchar* msg = g_strdup_printf( + "WARNING: Ignoring unknown JSON message:\n%s\n", text); + log_error(msg, server_state, STREAM_ERROR); + g_free (msg); + return 0; + } + g_object_unref (parser); + return 1; +} + + +static void on_server_closed( + SoupWebsocketConnection* conn, + camera_pipe_t* pipeline) +{ + UNUSED(conn); + UNUSED(pipeline); + server_state = SERVER_STATE_CLOSED; + log_error("Server connection closed", 0, 0); + /* Update sessions with NULL for ws_conn */ + for (size_t i = 0; i < MAX_WEBRTC_SESSIONS; i++) { + sessions[i].ws_conn_ref = NULL; + } +} + + +/* One mega message handler for our asynchronous calling mechanism */ +static void on_server_message( + SoupWebsocketConnection* conn, + SoupWebsocketDataType type, + GBytes* message, + camera_pipe_t* pipeline) +{ + gchar *text = NULL; + + switch (type) { + case SOUP_WEBSOCKET_DATA_BINARY: { + g_printerr("Received unknown binary message, ignoring\n"); + return; + } + case SOUP_WEBSOCKET_DATA_TEXT: { + gsize size; + const gchar *data = g_bytes_get_data(message, &size); + /* Convert to NULL-terminated string */ + text = g_strndup(data, size); + break; + } + default: { + log_error("Unknown websocket data type\n", + SERVER_STATE_ERROR, STREAM_ERROR); + goto out; + } + } + + /* Check that we can return messages to signalling server */ + if (soup_websocket_connection_get_state(conn) != + SOUP_WEBSOCKET_STATE_OPEN) { + log_error("No websocket connection\n", + SERVER_STATE_ERROR, STREAM_ERROR); + goto out; + } + + if (process_server_messages(text, pipeline, conn) == 0) { + process_json_messages(text, pipeline, conn); + } + out: + g_free(text); +} + + +static void on_server_connected( + SoupSession* session, + GAsyncResult* res, + camera_pipe_t* pipeline) +{ + GError *error = NULL; + + ws_conn = soup_session_websocket_connect_finish (session, res, &error); + if (error) { + log_error (error->message, SERVER_STATE_CONNECTION_ERROR, 0); + g_error_free(error); + return; + } + + /* Update sessions with ws_conn */ + for (size_t i = 0; i < MAX_WEBRTC_SESSIONS; i++) { + sessions[i].ws_conn_ref = ws_conn; + } + + g_assert_nonnull (ws_conn); + + server_state = SERVER_STATE_CONNECTED; + g_print("Connected to signalling server\n"); + + g_signal_connect(ws_conn, "closed", G_CALLBACK (on_server_closed), pipeline); + g_signal_connect(ws_conn, "message", G_CALLBACK (on_server_message), pipeline); + + /* Register with the server so it knows about us and can accept commands */ + register_with_server(); +} + + +/* + * Connect to the signalling server. This is the entrypoint for everything else. + */ +void webrtc_websocket_controller_setup(camera_pipe_t* pipeline) +{ + SoupLogger *logger; + SoupMessage *message; + SoupSession *soup_session; + const char *http_aliases[] = {"ws", NULL}; + + soup_session = soup_session_new_with_options( + SOUP_SESSION_SSL_STRICT, FALSE, + SOUP_SESSION_HTTP_ALIASES, http_aliases, NULL); + + logger = soup_logger_new(SOUP_LOGGER_LOG_BODY, -1); + soup_session_add_feature(soup_session, SOUP_SESSION_FEATURE(logger)); + g_object_unref(logger); + + message = soup_message_new(SOUP_METHOD_GET, server_url); + + g_print("Connecting to server...\n"); + + /* Once connected, we will register */ + soup_session_websocket_connect_async(soup_session, message, NULL, + NULL, NULL, (GAsyncReadyCallback) on_server_connected, pipeline); + server_state = SERVER_STATE_CONNECTING; +} + + +void webrtc_websocket_controller_teardown() +{ + if (ws_conn) { + if (soup_websocket_connection_get_state(ws_conn) == + SOUP_WEBSOCKET_STATE_OPEN) { + soup_websocket_connection_close(ws_conn, 1000, ""); + } + else { + g_object_unref(ws_conn); + } + } +} diff --git a/streamer/js/webrtc-demo.html b/streamer/js/webrtc-demo.html new file mode 100644 index 0000000..4e1b335 --- /dev/null +++ b/streamer/js/webrtc-demo.html @@ -0,0 +1,35 @@ + + + + + + + + + + + +
+ + +
+
+
Status: unknown
+
+
Our id is unknown
+
+ + diff --git a/streamer/js/webrtc-demo.js b/streamer/js/webrtc-demo.js new file mode 100644 index 0000000..f977f04 --- /dev/null +++ b/streamer/js/webrtc-demo.js @@ -0,0 +1,358 @@ +/* + * Javascript app for negotiating and streaming a webrtc video stream + * with a GStreamer app. On connection to signalling server, a uid is + * requested and all communication is addressed via that uid + * + * Author: Nirbheek Chauhan + * Author: Aiden Jeffrey + */ + +// Set this to override the automatic detection in websocketServerConnect() +var ws_server; +var ws_port; +// Override with your own STUN servers if you want +var rtc_configuration = {iceServers: [{urls: "stun:stun.services.mozilla.com"}, + {urls: "stun:stun.l.google.com:19302"}]}; + +var connect_attempts = 0; +var client_uid = 0; +var peer_connection; +var send_channel; +var ws_conn; + +var connect_button; +var disconnect_button; + +var ice_candidate_queue = []; + + +/** Waits for document to load, then sets up listeners */ +document.addEventListener("DOMContentLoaded", function(event) { + + connect_button = document.getElementById("connect_btn"); + connect_button.addEventListener("click", function() { + cmd = {"command": {"type": "connect-to-mountpoint", + "data": null}, + "client_uid": client_uid} + ws_conn.send(JSON.stringify(cmd)); + }); + + disconnect_button = document.getElementById("disconnect_btn"); + disconnect_button.addEventListener("click", function() { + console.log("Disconnecting"); + // Request mountpoints from media server + cmd = {"command": {"type": "disconnect-mountpoint", + "data": null}, + "client_uid": client_uid} + ws_conn.send(JSON.stringify(cmd)); + }); +}); + + +function resetState() { + // This will call onServerClose() + ws_conn.close(); +} + +function handleIncomingError(error) { + setError("ERROR: " + error); + resetState(); +} + +function getVideoElement() { + return document.getElementById("stream"); +} + +function setStatus(text) { + console.log(text); + var span = document.getElementById("status") + // Don't set the status if it already contains an error + if (!span.classList.contains("error")) + span.textContent = text; +} + +function setError(text) { + console.error(text); + var span = document.getElementById("status") + span.textContent = text; + span.classList.add("error"); +} + +function resetVideo() { + // Reset the video element and stop showing the last received frame + var videoElement = getVideoElement(); + videoElement.pause(); + videoElement.src = ""; + videoElement.load(); +} + + +// SDP offer received from peer, set remote description and create an answer +function onIncomingSDP(sdp) { + peer_connection.setRemoteDescription(sdp).then(() => { + setStatus("Remote SDP set"); + if (sdp.type != "offer") + return; + setStatus("Got SDP offer"); + while (ice_candidate_queue.length > 0) { + var candidate = ice_candidate_queue.shift(); + peer_connection.addIceCandidate(candidate).catch(setError); + } + peer_connection.createAnswer() + .then(onLocalDescription).catch(setError); + }).catch(setError); +} + + +// Local description was set, send it to peer +function onLocalDescription(desc) { + console.log("Got local description: " + JSON.stringify(desc)); + peer_connection.setLocalDescription(desc).then(function() { + setStatus("Sending SDP answer"); + sdp = {"sdp": peer_connection.localDescription, + "client_uid": client_uid} + ws_conn.send(JSON.stringify(sdp)); + }); +} + + +// ICE candidate received from peer, add it to the peer connection +function onIncomingICE(ice) { + var candidate = new RTCIceCandidate(ice); + if(!peer_connection || !peer_connection.remoteDescription.type){ + // put candidate on queue + ice_candidate_queue.push(candidate); + } + else { + peer_connection.addIceCandidate(candidate).catch(setError); + } +} + + +function get_json_message(data) { + try { + msg = JSON.parse(data); + } catch (e) { + if (e instanceof SyntaxError) { + handleIncomingError("Error parsing incoming JSON: " + data); + } else { + handleIncomingError("Unknown error parsing response: " + data); + } + return null; + } + return msg; +} + + +function remove_options(selectbox) +{ + var i; + for (i = selectbox.options.length - 1 ; i >= 0 ; i--) + { + selectbox.remove(i); + } +} + + +/** Processes command data + * @param {json} msg - the json message to parse + */ +function process_command(msg) { + console.log("process_command: ", msg); + if (msg.command == null || !msg.success) { + console.error("Command failed!", msg) + return false; + } + else if (msg.command.type == "connect-to-mountpoint") { + console.log("CONNECT TO MOUNTPOINT:", msg); + } + else if (msg.command.type == "disconnect-mountpoint") { + console.log("DISCONNECT MOUNTPOINT:", msg); + close_peer_connection(); + } + else { + console.error( + "Command type not recognised! " + msg.command.type) + return false; + } +} + + +function onServerMessage(event) { + console.log("onServerMessage:" + event.data); + if (event.data.startsWith("ASSIGNED UID")) { + var fields = event.data.split(" "); + client_uid = Number(fields[2]); + document.getElementById("client_uid").textContent = client_uid; + setStatus("Registered with media server"); + return; + } + else { + if (event.data.startsWith("ERROR")) { + handleIncomingError(event.data); + return; + } + // Handle incoming JSON SDP and ICE messages + msg = get_json_message(event.data); + if (msg == null) { + return; + } + + // Incoming JSON signals the beginning of streaming session + if (msg.command != null) { + process_command(msg); + } + else if (!peer_connection) { + create_call(msg); + } + + if (msg.sdp != null) { + onIncomingSDP(msg.sdp); + } else if (msg.ice != null) { + onIncomingICE(msg.ice); + } else if (msg.command != null) { + console.log("Command mirror:", msg); + } else { + handleIncomingError("Unknown incoming JSON: " + msg); + } + } +} + + +function close_peer_connection() { + if (peer_connection) { + peer_connection.close(); + peer_connection = null; + } +} + + +function onServerClose(event) { + setStatus("Disconnected from server"); + resetVideo(); + + close_peer_connection(); + + // Reset after a second + window.setTimeout(websocketServerConnect, 1000); +} + +function onServerError(event) { + setError("Unable to connect to server, did you add an exception for the certificate?") + // Retry after 3 seconds + window.setTimeout(websocketServerConnect, 3000); +} + +function websocketServerConnect() { + connect_attempts++; + if (connect_attempts > 3) { + setError("Too many connection attempts, aborting. Refresh page to try again"); + return; + } + // Clear errors in the status span + var span = document.getElementById("status"); + span.classList.remove("error"); + span.textContent = ""; + // Fetch the peer id to use + ws_port = ws_port || "8443"; + if (window.location.protocol.startsWith ("file")) { + ws_server = ws_server || "127.0.0.1"; + } else if (window.location.protocol.startsWith ("http")) { + ws_server = ws_server || window.location.hostname; + } else { + throw new Error ("Don't know how to connect to the signalling server with uri" + window.location); + } + var ws_url = "ws://" + ws_server + ":" + ws_port; + setStatus("Connecting to server " + ws_url); + ws_conn = new WebSocket(ws_url); + /* When connected, immediately register with the server */ + ws_conn.addEventListener("open", (event) => { + ws_conn.send("REGISTER CLIENT"); + setStatus("Registering with signalling server"); + }); + ws_conn.addEventListener("error", onServerError); + ws_conn.addEventListener("message", onServerMessage); + ws_conn.addEventListener("close", onServerClose); +} + +function onRemoteTrack(event) { + if (getVideoElement().srcObject !== event.streams[0]) { + console.log("Incoming stream"); + getVideoElement().srcObject = event.streams[0]; + } +} + +function errorUserMediaHandler() { + setError("Browser doesn't support getUserMedia!"); +} + +const handleDataChannelOpen = (event) =>{ + console.log("dataChannel.OnOpen", event); +}; + +const handleDataChannelMessageReceived = (event) =>{ + console.log("dataChannel.OnMessage:", event, event.data.type); + + setStatus("Received data channel message"); + if (typeof event.data === "string" || event.data instanceof String) { + console.log("Incoming string message: " + event.data); + textarea = document.getElementById("text") + textarea.value = textarea.value + "\n" + event.data + } else { + console.log("Incoming data message"); + } + send_channel.send("Hi! (from browser)"); +}; + +const handleDataChannelError = (error) =>{ + console.log("dataChannel.OnError:", error); +}; + +const handleDataChannelClose = (event) =>{ + console.log("dataChannel.OnClose", event); +}; + +function onDataChannel(event) { + setStatus("Data channel created"); + let receiveChannel = event.channel; + receiveChannel.onopen = handleDataChannelOpen; + receiveChannel.onmessage = handleDataChannelMessageReceived; + receiveChannel.onerror = handleDataChannelError; + receiveChannel.onclose = handleDataChannelClose; +} + + +/** This should be called when sdp is sent from Media Server */ +function create_call(msg) { + console.log("CREATE_CALL") + // Reset connection attempts because we connected successfully + connect_attempts = 0; + + console.log("Creating RTCPeerConnection"); + + peer_connection = new RTCPeerConnection(rtc_configuration); + send_channel = peer_connection.createDataChannel("label", null); + send_channel.onopen = handleDataChannelOpen; + send_channel.onmessage = handleDataChannelMessageReceived; + send_channel.onerror = handleDataChannelError; + send_channel.onclose = handleDataChannelClose; + peer_connection.ondatachannel = onDataChannel; + peer_connection.ontrack = onRemoteTrack; + + if (!msg.sdp) { + console.log("WARNING: First message wasn't an SDP message!?", msg); + } + + peer_connection.onicecandidate = (event) => { + // We have a candidate, send it to the remote party with the + // same uuid + if (event.candidate == null) { + console.log("ICE Candidate was null, done"); + return; + } + ws_conn.send(JSON.stringify({"ice": event.candidate, + "client_uid": client_uid})); + }; + + setStatus("Created peer connection for call, waiting for SDP"); +}