Skip to content

Commit 708740e

Browse files
committed
Implement RTMP buffer on the session
1 parent 74873de commit 708740e

File tree

6 files changed

+1498
-753
lines changed

6 files changed

+1498
-753
lines changed

controllers/myStreamer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func GetMyStreamerHandler(c echo.Context) error {
5151

5252
session, _ := sessions.GetSession(streamKey)
5353

54-
if session != nil && session.Active {
54+
if session != nil && session.Running {
5555
myStreamer.Live = true
5656
}
5757

rtmpConnectionHandler.go

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"log"
77
"os"
88
"strings"
9-
"time"
109

1110
"github.com/geekgonecrazy/prismplus/sessions"
1211
"github.com/geekgonecrazy/prismplus/streamers"
@@ -41,62 +40,35 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
4140
os.Exit(1)
4241
}
4342

44-
// Mark session as active and stash headers for replay on new destinations
45-
session.ChangeState(true) // Mark active
43+
packet, err := conn.ReadPacket()
44+
if err != nil {
45+
fmt.Println("can't read packet:", err)
46+
}
47+
48+
session.SetBufferSize(packet)
49+
50+
// stash headers for replay on new destinations
4651
session.SetHeaders(streams)
4752

48-
log.Println("RTMP connection now active for session", key)
53+
go session.Run()
4954

50-
for _, destination := range session.Destinations {
51-
if err := destination.RTMP.WriteHeader(streams); err != nil {
52-
fmt.Println("can't write header to destination stream:", err)
53-
// os.Exit(1)
54-
}
55-
go destination.RTMP.Loop()
56-
}
55+
log.Println("RTMP connection now active for session", key)
5756

58-
lastTime := time.Now()
5957
for {
60-
if session.End {
61-
fmt.Printf("Ending session %s\n", key)
62-
break
63-
}
64-
6558
packet, err := conn.ReadPacket()
6659
if err != nil {
6760
fmt.Println("can't read packet:", err)
6861
break
6962
}
7063

71-
if time.Since(lastTime) > time.Second {
72-
fmt.Println("Duration:", packet.Time)
73-
lastTime = time.Now()
74-
}
75-
76-
for _, destination := range session.Destinations {
77-
destination.RTMP.WritePacket(packet)
78-
}
79-
}
80-
81-
session.ChangeState(false) // Mark inactive
82-
83-
for _, destination := range session.Destinations {
84-
err := destination.RTMP.Disconnect()
85-
if err != nil {
86-
fmt.Println(err)
87-
os.Exit(1)
88-
}
64+
session.RelayPacket(packet)
8965
}
9066

91-
if session.End {
92-
fmt.Printf("Session %s ended\n", key)
93-
// Make sure we are closed
94-
if err := conn.Close(); err != nil {
95-
log.Println(err)
96-
}
67+
session.StreamDisconnected()
68+
log.Println("Not processing any more. RTMP relaying stopped")
9769

98-
if err := sessions.DeleteSession(key); err != nil {
99-
log.Println(err)
100-
}
70+
// Make sure we are closed
71+
if err := conn.Close(); err != nil {
72+
log.Println(err)
10173
}
10274
}

sessions/session.go

Lines changed: 187 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"strings"
88
"sync"
9+
"time"
910

1011
"github.com/geekgonecrazy/prismplus/models"
1112
"github.com/geekgonecrazy/prismplus/rtmp"
@@ -17,16 +18,35 @@ var (
1718
ErrNotFound = errors.New("not found")
1819
)
1920

21+
type StreamState uint8
22+
23+
const (
24+
StreamCreated StreamState = iota
25+
StreamPacketReceived
26+
StreamBuffering
27+
StreamStreaming
28+
StreamDisconnected
29+
)
30+
2031
type Session struct {
21-
StreamerID int `json:"streamerId"`
22-
Key string `json:"key"`
23-
Destinations map[int]*Destination `json:"destinations"`
24-
Delay int `json:"delay"`
25-
NextDestinationID int `json:"nextDestinationId"`
26-
Active bool `json:"active"`
27-
End bool `json:"end"`
28-
StreamHeaders []av.CodecData `json:"streamHeaders"`
29-
_lock sync.Mutex // Might need if we allow modify
32+
StreamerID int `json:"streamerId"`
33+
Key string `json:"key"`
34+
Destinations map[int]*Destination `json:"destinations"`
35+
Delay int `json:"delay"`
36+
NextDestinationID int `json:"nextDestinationId"`
37+
Running bool `json:"running"`
38+
streamHeaders []av.CodecData `json:"streamHeaders"`
39+
streamStatus StreamState
40+
buffer chan av.Packet
41+
incomingDuration time.Duration
42+
previousIncomingDuration time.Duration
43+
bufferedDuration time.Duration
44+
outgoingDuration time.Duration
45+
bufferLength time.Duration
46+
lastPacketTime time.Time //lastPacketTime is currently only used to help with verbose logging, don't use for any logic
47+
discrepancySize time.Duration
48+
stop chan bool
49+
_lock sync.Mutex // Might need if we allow modify
3050
}
3151

3252
type Destination struct {
@@ -59,8 +79,8 @@ func (s *Session) AddDestination(destinationPayload models.Destination) error {
5979
RTMP: conn,
6080
}
6181

62-
if s.Active {
63-
if err := conn.WriteHeader(s.StreamHeaders); err != nil {
82+
if s.Running {
83+
if err := conn.WriteHeader(s.streamHeaders); err != nil {
6484
fmt.Println("can't write header:", err)
6585
// os.Exit(1)
6686
}
@@ -103,16 +123,159 @@ func (s *Session) RemoveDestination(id int) error {
103123
return nil
104124
}
105125

106-
func (s *Session) ChangeState(active bool) {
107-
s.Active = active
126+
func (s *Session) SetHeaders(streams []av.CodecData) {
127+
s.streamHeaders = streams
108128
}
109129

110-
func (s *Session) SetHeaders(streams []av.CodecData) {
111-
s.StreamHeaders = streams
130+
func (s *Session) Run() {
131+
// Prevent another routine from running
132+
if s.Running {
133+
return
134+
}
135+
136+
// TODO: Sessions once ended actually shouldn't be removed
137+
// Reset in case the session is reused
138+
s.lastPacketTime = time.Now()
139+
s.bufferedDuration = time.Duration(0)
140+
s.outgoingDuration = time.Duration(0)
141+
s.incomingDuration = time.Duration(0)
142+
143+
s.Running = true
144+
s.streamStatus = StreamBuffering
145+
log.Println("Session switched to buffering")
146+
147+
// Loop waiting for buffering to finish
148+
for {
149+
if s.streamStatus == StreamDisconnected {
150+
log.Println("Stopping buffer due to disconnect")
151+
break
152+
}
153+
154+
if time.Since(s.lastPacketTime) > time.Second {
155+
log.Printf("Buffered packets: %s/%s Buffered Packets: %d", s.bufferedDuration, s.bufferLength, len(s.buffer))
156+
s.lastPacketTime = time.Now()
157+
}
158+
159+
if s.incomingDuration >= s.bufferLength {
160+
log.Println("Session switched to streaming", len(s.buffer))
161+
s.streamStatus = StreamStreaming
162+
break
163+
}
164+
}
165+
166+
if s.streamStatus == StreamStreaming {
167+
log.Println("Finished Buffering")
168+
169+
log.Println("Connecting to destinations")
170+
for _, destination := range s.Destinations {
171+
if err := destination.RTMP.WriteHeader(s.streamHeaders); err != nil {
172+
fmt.Println("can't write header to destination stream:", err)
173+
// os.Exit(1)
174+
}
175+
176+
go destination.RTMP.Loop()
177+
}
178+
179+
log.Println("Beginning to stream to destinations")
180+
181+
streamedTime := time.Now()
182+
183+
Loop:
184+
for {
185+
186+
select {
187+
case <-s.stop:
188+
break Loop
189+
case packet := <-s.buffer:
190+
// Use the timing in the packet - the time streamed to figure out when next packet should go out
191+
time.Sleep(packet.Time - s.outgoingDuration)
192+
193+
// Verbose logging just to be able to see the state of the buffer
194+
if time.Since(streamedTime) > time.Second {
195+
log.Printf("Outgoing Packet Time: %s (idx %d); Incoming Packet Time: %s; Buffered up to Time: %s; Buffered Packets: %d", packet.Time, packet.Idx, s.incomingDuration, s.bufferedDuration, len(s.buffer))
196+
197+
streamedTime = time.Now()
198+
}
199+
200+
// Write packet out to all destinations
201+
for _, destination := range s.Destinations {
202+
destination.RTMP.WritePacket(packet)
203+
}
204+
205+
// Update with the time marker that has been already streamed
206+
s.outgoingDuration = packet.Time
207+
s.lastPacketTime = time.Now()
208+
}
209+
210+
if s.streamStatus == StreamDisconnected && len(s.buffer) == 0 {
211+
log.Println("Stream is disconnected and buffer is empty. Sending Stop Signal")
212+
s.stop <- true
213+
}
214+
}
215+
}
216+
217+
log.Println("Disconnecting from destinations")
218+
for _, destination := range s.Destinations {
219+
err := destination.RTMP.Disconnect()
220+
if err != nil {
221+
fmt.Println(err)
222+
}
223+
}
224+
225+
s.Running = false
226+
227+
log.Println("Attempt to self destruct session")
228+
if err := DeleteSession(s.Key); err != nil {
229+
log.Println(err)
230+
}
231+
}
232+
233+
func (s *Session) SetBufferSize(packet av.Packet) error {
234+
if s.Running && s.streamStatus != StreamCreated {
235+
log.Println("Can't set Packet size for buffer while session is running")
236+
s.RelayPacket(packet)
237+
return nil
238+
}
239+
240+
packetsNeeded := s.bufferLength / packet.CompositionTime
241+
size := int(packetsNeeded) * 4
242+
243+
log.Println("Packet size:", packet.CompositionTime)
244+
245+
log.Println("Setting buffer size to:", size)
246+
247+
// Need to figure out how to do this properly again with math
248+
s.buffer = make(chan av.Packet, 100000)
249+
250+
s.RelayPacket(packet)
251+
252+
return nil
253+
}
254+
255+
func (s *Session) RelayPacket(p av.Packet) {
256+
packetDuration := p.Time
257+
258+
if s.discrepancySize+packetDuration < s.previousIncomingDuration {
259+
s.discrepancySize = s.bufferedDuration
260+
log.Println("Discrepancy detected correcting", s.discrepancySize)
261+
}
262+
263+
s.incomingDuration = packetDuration
264+
265+
p.Time = s.discrepancySize + packetDuration
266+
267+
s.bufferedDuration = p.Time
268+
s.previousIncomingDuration = p.Time
269+
270+
s.buffer <- p
271+
}
272+
273+
func (s *Session) StreamDisconnected() {
274+
s.streamStatus = StreamDisconnected
112275
}
113276

114277
func (s *Session) EndSession() {
115-
s.End = true
278+
s.stop <- true
116279
}
117280

118281
func InitializeSessionStore() {
@@ -127,16 +290,21 @@ func CreateSession(sessionPayload models.SessionPayload) error {
127290
}
128291

129292
if existingSession != nil {
130-
return errors.New("Already Exists")
293+
return errors.New("session already Exists")
131294
}
132295

296+
bufferLength := time.Second * time.Duration(sessionPayload.Delay)
297+
133298
session := &Session{
299+
Delay: sessionPayload.Delay,
134300
StreamerID: sessionPayload.StreamerID,
135301
Key: sessionPayload.Key,
136302
Destinations: map[int]*Destination{},
137303
NextDestinationID: 0,
138-
Active: false,
139-
End: false,
304+
Running: false,
305+
buffer: make(chan av.Packet, 1),
306+
stop: make(chan bool, 1),
307+
bufferLength: bufferLength,
140308
}
141309

142310
_sessions[sessionPayload.Key] = session

0 commit comments

Comments
 (0)