Skip to content

Commit

Permalink
srtla2: now with receiver support for multiple connections
Browse files Browse the repository at this point in the history
It started with that relatively simple aim, but in the process I've
also refactored, cleaned up and rewrote much of the code

WIP. Probably only small tweaks at this point. No more force pushing
in the v2 branch

TODO:
* more testing, particularly on x86
  • Loading branch information
rationalsa committed Feb 4, 2021
1 parent 2207c3d commit 6f3925e
Show file tree
Hide file tree
Showing 5 changed files with 1,130 additions and 328 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,27 @@ How does it work?
The core idea is that srtla keeps track of the number of packets in flight (sent but unacknowledged) for each link, together with a dynamic window size that tracks the capacity of each link - similarly to TCP congestion control. These are used together to balance the traffic through each link proportionally to its capacity. However, note that no congestion control is applied.

This assumes that you're familiar with the [SRT spec](https://tools.ietf.org/html/draft-sharabayko-mops-srt-00). TODO: technical description in more detail.

**srtla v2**

The main improvement in srtla v2 is that it supports multiple *srtla senders* connecting to a single *srtla receiver* by establishing *connection groups* - note that these are different from the experimental *socket groups* in SRT. To support this feature, I've introduced a 2-phase connection registration phase.

Normal registration:

Sender (conn 0): `SRTLA_REG1(sender_id = SRTLA_ID_LEN bytes sender-generated random id)`
Receiver: `SRTLA_REG2(full_id = sender_id with the last SRTLA_ID_LEN/2 bytes replaced with receiver-generated values)`
Sender (conn 0): `SRTLA_REG2(full_id)`
Receiver: `SRTLA_REG3`
[...]
Sender (conn n): `SRTLA_REG2(full_id)`
Receiver: `SRTLA_REG3`


Error responses are only sent from the *receiver*. If the *sender* encounters an error, it should just abandon the relevant *connection group* or *connection*, and it will be garbage collected on the receiver side after some time. Possible error responses are sent after receiving a `SRTLA_REG1` or `SRTLA_REG2` request.

* `SRTLA_REG_ERR` - Can be sent in response to any `SRTLA_REG` command. Operation temporarily failed, back off and retry later.
* `SRTLA_REG_NAK` - Sent in response to `SRTLA_REG1`. Operation refused, give up. Either incompatible or access denied. A human-readable error message may be appended after the header.
* `SRTLA_REG_NGP` - Sent in response to `SRTLA_REG2` with an invalid ID. Register the group again with `SRTLA_REG1`

TODO:
* clean up the printed messages, separate in error / verbose / debugging
68 changes: 49 additions & 19 deletions common.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
srtla - SRT transport proxy with link aggregation
Copyright (C) 2020 BELABOX project
Copyright (C) 2020-2021 BELABOX project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
Expand All @@ -21,6 +21,7 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>

#include "common.h"

Expand All @@ -29,6 +30,18 @@ void exit_help() {
exit(EXIT_FAILURE);
}

#define ADDR_BUF_SZ 50
char _global_addr_buf[ADDR_BUF_SZ];
const char *print_addr(struct sockaddr *addr) {
struct sockaddr_in *ain = (struct sockaddr_in *)addr;
return inet_ntop(ain->sin_family, &ain->sin_addr, _global_addr_buf, ADDR_BUF_SZ);
}

int port_no(struct sockaddr *addr) {
struct sockaddr_in *ain = (struct sockaddr_in *)addr;
return ntohs(ain->sin_port);
}

int parse_ip(struct sockaddr_in *addr, char *ip_str) {
in_addr_t ip = inet_addr(ip_str);
if (ip == -1) return -1;
Expand All @@ -46,41 +59,58 @@ int parse_port(char *port_str) {
return port;
}

extern fd_set active_fds;
extern int max_act_fd;
int add_active_fd(int fd) {
if (fd < 0) return -1;

if (fd > max_act_fd) max_act_fd = fd;
FD_SET(fd, &active_fds);

int get_seconds(time_t *s) {
struct timespec ts;
int ret = clock_gettime(CLOCK_MONOTONIC_COARSE, &ts);
if (ret != 0) return -1;
*s = ts.tv_sec;
return 0;
}

int remove_active_fd(int fd) {
if (fd < 0) return -1;

FD_CLR(fd, &active_fds);
int get_ms(uint64_t *ms) {
struct timespec ts;
int ret = clock_gettime(CLOCK_MONOTONIC_COARSE, &ts);
if (ret != 0) return -1;
*ms = ((uint64_t)(ts.tv_sec)) * 1000 + ((uint64_t)(ts.tv_nsec)) / 1000 / 1000;

return 0;
}

int32_t get_srt_sn(void *pkt) {
int32_t get_srt_sn(void *pkt, int n) {
if (n < 4) return -1;

uint32_t sn = be32toh(*((uint32_t *)pkt));
if ((sn & (1 << 31)) == 0) {
return (int32_t)sn;
}

return -1;
}

uint16_t get_srt_type(void *pkt) {
uint16_t get_srt_type(void *pkt, int n) {
if (n < 2) return 0;
return be16toh(*((uint16_t *)pkt));
}

int is_srt_ack(void *pkt) {
return get_srt_type(pkt) == SRT_TYPE_ACK;
int is_srt_ack(void *pkt, int n) {
return get_srt_type(pkt, n) == SRT_TYPE_ACK;
}

int is_srtla_keepalive(void *pkt, int n) {
return get_srt_type(pkt, n) == SRTLA_TYPE_KEEPALIVE;
}

int is_srtla_reg1(void *pkt, int len) {
if (len != SRTLA_TYPE_REG1_LEN) return 0;
return get_srt_type(pkt, len) == SRTLA_TYPE_REG1;
}

int is_srtla_reg2(void *pkt, int len) {
if (len != SRTLA_TYPE_REG2_LEN) return 0;
return get_srt_type(pkt, len) == SRTLA_TYPE_REG2;
}

int is_srtla_keepalive(void *pkt) {
return get_srt_type(pkt) == SRTLA_TYPE_KEEPALIVE;
int is_srtla_reg3(void *pkt, int len) {
if (len != SRTLA_TYPE_REG3_LEN) return 0;
return get_srt_type(pkt, len) == SRTLA_TYPE_REG3;
}
87 changes: 80 additions & 7 deletions common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
srtla - SRT transport proxy with link aggregation
Copyright (C) 2020 BELABOX project
Copyright (C) 2020-2021 BELABOX project
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
Expand All @@ -18,18 +18,91 @@

#define MTU 1500

#define SRT_TYPE_HANDSHAKE 0x8000
#define SRT_TYPE_ACK 0x8002
#define SRT_TYPE_NAK 0x8003
#define SRT_TYPE_SHUTDOWN 0x8005

#define SRTLA_TYPE_KEEPALIVE 0x9000
#define SRTLA_TYPE_ACK 0x9100
#define SRTLA_TYPE_REG1 0x9200
#define SRTLA_TYPE_REG2 0x9201
#define SRTLA_TYPE_REG3 0x9202
#define SRTLA_TYPE_REG_ERR 0x9210
#define SRTLA_TYPE_REG_NGP 0x9211
#define SRTLA_TYPE_REG_NAK 0x9212

#define SRT_MIN_LEN 16

#define SRTLA_ID_LEN 256
#define SRTLA_TYPE_REG1_LEN (2 + (SRTLA_ID_LEN))
#define SRTLA_TYPE_REG2_LEN (2 + (SRTLA_ID_LEN))
#define SRTLA_TYPE_REG3_LEN 2

typedef struct __attribute__((__packed__)) {
uint16_t type;
uint16_t subtype;
uint32_t info;
uint32_t timestamp;
uint32_t dest_id;
} srt_header_t;

typedef struct __attribute__((__packed__)) {
srt_header_t header;
uint32_t version;
uint16_t enc_field;
uint16_t ext_field;
uint32_t initial_seq;
uint32_t mtu;
uint32_t mfw;
uint32_t handshake_type;
uint32_t source_id;
uint32_t syn_cookie;
char peer_ip[16];
} srt_handshake_t;

#define LOG_NONE 0 // prints only fatal errors
#define LOG_ERR 1 // prints errors we can tolerate
#define LOG_INFO 2 // prints informational messages
#define LOG_DEBUG 3 // prints potentially verbose messages about the internal workings

#define LOG_LEVEL LOG_INFO

#if LOG_LEVEL >= LOG_DEBUG
#define debug(...) fprintf(stderr, __VA_ARGS__)
#else
#define debug(...)
#endif

#if LOG_LEVEL >= LOG_INFO
#define info(...) fprintf(stderr, __VA_ARGS__)
#else
#define info(...)
#endif

#if LOG_LEVEL >= LOG_ERR
#define err(...) fprintf(stderr, __VA_ARGS__)
#else
#define err(...)
#endif

void print_help();
void exit_help();

int get_seconds(time_t *s);
int get_ms(uint64_t *ms);

const char *print_addr(struct sockaddr *addr);
int port_no(struct sockaddr *addr);
int parse_ip(struct sockaddr_in *addr, char *ip_str);
int parse_port(char *port_str);
int add_active_fd(int fd);
int remove_active_fd(int fd);
int32_t get_srt_sn(void *pkt);
uint16_t get_srt_type(void *pkt);
int is_srt_ack(void *pkt);
int is_srtla_keepalive(void *pkt);

int32_t get_srt_sn(void *pkt, int n);
uint16_t get_srt_type(void *pkt, int n);
int is_srt_ack(void *pkt, int n);
int is_srt_shutdown(void *pkt, int n);

int is_srtla_keepalive(void *pkt, int len);
int is_srtla_reg1(void *pkt, int len);
int is_srtla_reg2(void *pkt, int len);
int is_srtla_reg3(void *pkt, int len);
Loading

0 comments on commit 6f3925e

Please sign in to comment.