Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
vendor/

prism
prismplus
dist/
.envrc
data.bbolt
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ npm install
npm run build
```

3. Build backend. The backend is written in golang.
3. Build backend (ensure you are in the project's root directory: `cd ..`). The backend is written in golang

```
go get
Expand Down
2 changes: 1 addition & 1 deletion controllers/myStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetMyStreamerHandler(c echo.Context) error {

session, _ := sessions.GetSession(streamKey)

if session != nil && session.Active {
if session != nil && session.Running {
myStreamer.Live = true
}

Expand Down
1 change: 1 addition & 0 deletions models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ type SessionPayload struct {
StreamerID int `json:"streamerId"`
Key string `json:"key"`
Destinations []Destination `json:"destinations"`
Delay int `json:"delay"`
}
2 changes: 2 additions & 0 deletions models/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import "time"
type StreamerCreatePayload struct {
Name string `json:"name"`
StreamKey string `json:"streamKey"`
Delay int `json:"delay"`
}

type Streamer struct {
ID int `json:"id"`
Name string `json:"name"`
StreamKey string `json:"streamKey"`
Delay int `json:"delay"`

NextDestinationID int `json:"nextDestinationId"`
Destinations []Destination `json:"destinations"`
Expand Down
60 changes: 16 additions & 44 deletions rtmpConnectionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"os"
"strings"
"time"

"github.com/geekgonecrazy/prismplus/sessions"
"github.com/geekgonecrazy/prismplus/streamers"
Expand Down Expand Up @@ -41,62 +40,35 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
os.Exit(1)
}

// Mark session as active and stash headers for replay on new destinations
session.ChangeState(true) // Mark active
packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
}

session.SetBufferSize(packet)

// stash headers for replay on new destinations
session.SetHeaders(streams)

log.Println("RTMP connection now active for session", key)
go session.Run()

for _, destination := range session.Destinations {
if err := destination.RTMP.WriteHeader(streams); err != nil {
fmt.Println("can't write header to destination stream:", err)
// os.Exit(1)
}
go destination.RTMP.Loop()
}
log.Println("RTMP connection now active for session", key)

lastTime := time.Now()
for {
if session.End {
fmt.Printf("Ending session %s\n", key)
break
}

packet, err := conn.ReadPacket()
if err != nil {
fmt.Println("can't read packet:", err)
break
}

if time.Since(lastTime) > time.Second {
fmt.Println("Duration:", packet.Time)
lastTime = time.Now()
}

for _, destination := range session.Destinations {
destination.RTMP.WritePacket(packet)
}
}

session.ChangeState(false) // Mark inactive

for _, destination := range session.Destinations {
err := destination.RTMP.Disconnect()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
session.RelayPacket(packet)
}

if session.End {
fmt.Printf("Session %s ended\n", key)
// Make sure we are closed
if err := conn.Close(); err != nil {
log.Println(err)
}
session.StreamDisconnected()
log.Println("Not processing any more. RTMP relaying stopped")

if err := sessions.DeleteSession(key); err != nil {
log.Println(err)
}
// Make sure we are closed
if err := conn.Close(); err != nil {
log.Println(err)
}
}
206 changes: 188 additions & 18 deletions sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"strings"
"sync"
"time"

"github.com/geekgonecrazy/prismplus/models"
"github.com/geekgonecrazy/prismplus/rtmp"
Expand All @@ -17,15 +18,35 @@ var (
ErrNotFound = errors.New("not found")
)

type StreamState uint8

const (
StreamCreated StreamState = iota
StreamPacketReceived
StreamBuffering
StreamStreaming
StreamDisconnected
)

type Session struct {
StreamerID int `json:"streamerId"`
Key string `json:"key"`
Destinations map[int]*Destination `json:"destinations"`
NextDestinationID int `json:"nextDestinationId"`
Active bool `json:"active"`
End bool `json:"end"`
StreamHeaders []av.CodecData `json:"streamHeaders"`
_lock sync.Mutex // Might need if we allow modify
StreamerID int `json:"streamerId"`
Key string `json:"key"`
Destinations map[int]*Destination `json:"destinations"`
Delay int `json:"delay"`
NextDestinationID int `json:"nextDestinationId"`
Running bool `json:"running"`
streamHeaders []av.CodecData `json:"streamHeaders"`
streamStatus StreamState
buffer chan av.Packet
incomingDuration time.Duration
previousIncomingDuration time.Duration
bufferedDuration time.Duration
outgoingDuration time.Duration
bufferLength time.Duration
lastPacketTime time.Time //lastPacketTime is currently only used to help with verbose logging, don't use for any logic
discrepancySize time.Duration
stop chan bool
_lock sync.Mutex // Might need if we allow modify
}

type Destination struct {
Expand Down Expand Up @@ -58,8 +79,8 @@ func (s *Session) AddDestination(destinationPayload models.Destination) error {
RTMP: conn,
}

if s.Active {
if err := conn.WriteHeader(s.StreamHeaders); err != nil {
if s.Running {
if err := conn.WriteHeader(s.streamHeaders); err != nil {
fmt.Println("can't write header:", err)
// os.Exit(1)
}
Expand Down Expand Up @@ -102,16 +123,159 @@ func (s *Session) RemoveDestination(id int) error {
return nil
}

func (s *Session) ChangeState(active bool) {
s.Active = active
func (s *Session) SetHeaders(streams []av.CodecData) {
s.streamHeaders = streams
}

func (s *Session) SetHeaders(streams []av.CodecData) {
s.StreamHeaders = streams
func (s *Session) Run() {
// Prevent another routine from running
if s.Running {
return
}

// TODO: Sessions once ended actually shouldn't be removed
// Reset in case the session is reused
s.lastPacketTime = time.Now()
s.bufferedDuration = time.Duration(0)
s.outgoingDuration = time.Duration(0)
s.incomingDuration = time.Duration(0)

s.Running = true
s.streamStatus = StreamBuffering
log.Println("Session switched to buffering")

// Loop waiting for buffering to finish
for {
if s.streamStatus == StreamDisconnected {
log.Println("Stopping buffer due to disconnect")
break
}

if time.Since(s.lastPacketTime) > time.Second {
log.Printf("Buffered packets: %s/%s Buffered Packets: %d", s.bufferedDuration, s.bufferLength, len(s.buffer))
s.lastPacketTime = time.Now()
}

if s.incomingDuration >= s.bufferLength {
log.Println("Session switched to streaming", len(s.buffer))
s.streamStatus = StreamStreaming
break
}
}

if s.streamStatus == StreamStreaming {
log.Println("Finished Buffering")

log.Println("Connecting to destinations")
for _, destination := range s.Destinations {
if err := destination.RTMP.WriteHeader(s.streamHeaders); err != nil {
fmt.Println("can't write header to destination stream:", err)
// os.Exit(1)
}

go destination.RTMP.Loop()
}

log.Println("Beginning to stream to destinations")

streamedTime := time.Now()

Loop:
for {

select {
case <-s.stop:
break Loop
case packet := <-s.buffer:
// Use the timing in the packet - the time streamed to figure out when next packet should go out
time.Sleep(packet.Time - s.outgoingDuration)

// Verbose logging just to be able to see the state of the buffer
if time.Since(streamedTime) > time.Second {
log.Printf("Outgoing Packet Time: %s (idx %d); Incoming Packet Time: %s; Buffered up to Time: %s; Buffered Packets: %d", packet.Time, packet.Idx, s.incomingDuration, s.bufferedDuration, len(s.buffer))

streamedTime = time.Now()
}

// Write packet out to all destinations
for _, destination := range s.Destinations {
destination.RTMP.WritePacket(packet)
}

// Update with the time marker that has been already streamed
s.outgoingDuration = packet.Time
s.lastPacketTime = time.Now()
}

if s.streamStatus == StreamDisconnected && len(s.buffer) == 0 {
log.Println("Stream is disconnected and buffer is empty. Sending Stop Signal")
s.stop <- true
}
}
}

log.Println("Disconnecting from destinations")
for _, destination := range s.Destinations {
err := destination.RTMP.Disconnect()
if err != nil {
fmt.Println(err)
}
}

s.Running = false

log.Println("Attempt to self destruct session")
if err := DeleteSession(s.Key); err != nil {
log.Println(err)
}
}

func (s *Session) SetBufferSize(packet av.Packet) error {
if s.Running && s.streamStatus != StreamCreated {
log.Println("Can't set Packet size for buffer while session is running")
s.RelayPacket(packet)
return nil
}

packetsNeeded := s.bufferLength / packet.CompositionTime
size := int(packetsNeeded) * 4

log.Println("Packet size:", packet.CompositionTime)

log.Println("Setting buffer size to:", size)

// Need to figure out how to do this properly again with math
s.buffer = make(chan av.Packet, 100000)

s.RelayPacket(packet)

return nil
}

func (s *Session) RelayPacket(p av.Packet) {
packetDuration := p.Time

if s.discrepancySize+packetDuration < s.previousIncomingDuration {
s.discrepancySize = s.bufferedDuration
log.Println("Discrepancy detected correcting", s.discrepancySize)
}

s.incomingDuration = packetDuration

p.Time = s.discrepancySize + packetDuration

s.bufferedDuration = p.Time
s.previousIncomingDuration = p.Time

s.buffer <- p
}

func (s *Session) StreamDisconnected() {
s.streamStatus = StreamDisconnected
}

func (s *Session) EndSession() {
s.End = true
s.stop <- true
}

func InitializeSessionStore() {
Expand All @@ -126,16 +290,21 @@ func CreateSession(sessionPayload models.SessionPayload) error {
}

if existingSession != nil {
return errors.New("Already Exists")
return errors.New("session already Exists")
}

bufferLength := time.Second * time.Duration(sessionPayload.Delay)

session := &Session{
Delay: sessionPayload.Delay,
StreamerID: sessionPayload.StreamerID,
Key: sessionPayload.Key,
Destinations: map[int]*Destination{},
NextDestinationID: 0,
Active: false,
End: false,
Running: false,
buffer: make(chan av.Packet, 1),
stop: make(chan bool, 1),
bufferLength: bufferLength,
}

_sessions[sessionPayload.Key] = session
Expand All @@ -154,6 +323,7 @@ func CreateSessionFromStreamer(streamer models.Streamer) (*Session, error) {
StreamerID: streamer.ID,
Key: streamer.StreamKey,
Destinations: streamer.Destinations,
Delay: streamer.Delay,
}

if err := CreateSession(sessionPayload); err != nil {
Expand Down
Loading