Skip to content

Commit 5c771cd

Browse files
committed
Implement RTMP buffer on the session
1 parent 29832ee commit 5c771cd

File tree

6 files changed

+1474
-783
lines changed

6 files changed

+1474
-783
lines changed

controllers/myStreamer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func GetMyStreamerHandler(c echo.Context) error {
5151

5252
session, _ := sessions.GetSession(streamKey)
5353

54-
if session != nil && session.Active {
54+
if session != nil && session.Running {
5555
myStreamer.Live = true
5656
}
5757

rtmpConnectionHandler.go

Lines changed: 13 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"log"
77
"os"
88
"strings"
9-
"time"
109

1110
"github.com/geekgonecrazy/prismplus/sessions"
1211
"github.com/geekgonecrazy/prismplus/streamers"
@@ -41,77 +40,35 @@ func rtmpConnectionHandler(conn *rtmp.Conn) {
4140
os.Exit(1)
4241
}
4342

44-
// stash headers for replay on new destinations
45-
session.SetHeaders(streams)
46-
47-
// If a delay is specified then we need to inspect first packet and set the buffer size
48-
if session.Delay > 0 {
49-
packet, err := conn.ReadPacket()
50-
if err != nil {
51-
fmt.Println("can't read packet:", err)
52-
}
53-
54-
delaySeconds := time.Duration(session.Delay) * time.Second
55-
56-
packetsNeeded := delaySeconds / packet.CompositionTime
43+
packet, err := conn.ReadPacket()
44+
if err != nil {
45+
fmt.Println("can't read packet:", err)
46+
}
5747

58-
session.SetBufferSize(int(packetsNeeded))
48+
session.SetBufferSize(packet)
5949

60-
session.RelayPacket(packet)
61-
}
50+
// stash headers for replay on new destinations
51+
session.SetHeaders(streams)
6252

63-
go session.ForwardPackets()
53+
go session.Run()
6454

6555
log.Println("RTMP connection now active for session", key)
6656

67-
for _, destination := range session.Destinations {
68-
if err := destination.RTMP.WriteHeader(streams); err != nil {
69-
fmt.Println("can't write header to destination stream:", err)
70-
// os.Exit(1)
71-
}
72-
go destination.RTMP.Loop()
73-
}
74-
75-
lastTime := time.Now()
7657
for {
77-
if session.End {
78-
fmt.Printf("Ending session %s\n", key)
79-
break
80-
}
81-
8258
packet, err := conn.ReadPacket()
8359
if err != nil {
8460
fmt.Println("can't read packet:", err)
8561
break
8662
}
8763

8864
session.RelayPacket(packet)
89-
90-
if time.Since(lastTime) > time.Second {
91-
fmt.Println("Duration:", packet.Time)
92-
lastTime = time.Now()
93-
}
9465
}
9566

96-
session.ChangeState(false) // Mark inactive
67+
session.StreamDisconnected()
68+
log.Println("Not processing any more. RTMP relaying stopped")
9769

98-
for _, destination := range session.Destinations {
99-
err := destination.RTMP.Disconnect()
100-
if err != nil {
101-
fmt.Println(err)
102-
os.Exit(1)
103-
}
104-
}
105-
106-
if session.End {
107-
fmt.Printf("Session %s ended\n", key)
108-
// Make sure we are closed
109-
if err := conn.Close(); err != nil {
110-
log.Println(err)
111-
}
112-
113-
if err := sessions.DeleteSession(key); err != nil {
114-
log.Println(err)
115-
}
70+
// Make sure we are closed
71+
if err := conn.Close(); err != nil {
72+
log.Println(err)
11673
}
11774
}

sessions/session.go

Lines changed: 166 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"strings"
88
"sync"
9+
"time"
910

1011
"github.com/geekgonecrazy/prismplus/models"
1112
"github.com/geekgonecrazy/prismplus/rtmp"
@@ -17,19 +18,35 @@ var (
1718
ErrNotFound = errors.New("not found")
1819
)
1920

21+
type StreamState uint8
22+
23+
const (
24+
StreamCreated StreamState = iota
25+
StreamPacketReceived
26+
StreamBuffering
27+
StreamStreaming
28+
StreamDisconnected
29+
)
30+
2031
type Session struct {
21-
StreamerID int `json:"streamerId"`
22-
Key string `json:"key"`
23-
Destinations map[int]*Destination `json:"destinations"`
24-
Delay int `json:"delay"`
25-
NextDestinationID int `json:"nextDestinationId"`
26-
Active bool `json:"active"`
27-
End bool `json:"end"`
28-
StreamHeaders []av.CodecData `json:"streamHeaders"`
29-
buffer chan av.Packet
30-
desiredBufferSize int
31-
bufferFilled bool
32-
_lock sync.Mutex // Might need if we allow modify
32+
StreamerID int `json:"streamerId"`
33+
Key string `json:"key"`
34+
Destinations map[int]*Destination `json:"destinations"`
35+
Delay int `json:"delay"`
36+
NextDestinationID int `json:"nextDestinationId"`
37+
Running bool `json:"running"`
38+
streamHeaders []av.CodecData `json:"streamHeaders"`
39+
streamStatus StreamState
40+
buffer chan av.Packet
41+
incomingDuration time.Duration
42+
previousIncomingDuration time.Duration
43+
bufferedDuration time.Duration
44+
outgoingDuration time.Duration
45+
bufferLength time.Duration
46+
lastPacketTime time.Time //lastPacketTime is currently only used to help with verbose logging, don't use for any logic
47+
discrepancySize time.Duration
48+
stop chan bool
49+
_lock sync.Mutex // Might need if we allow modify
3350
}
3451

3552
type Destination struct {
@@ -62,8 +79,8 @@ func (s *Session) AddDestination(destinationPayload models.Destination) error {
6279
RTMP: conn,
6380
}
6481

65-
if s.Active {
66-
if err := conn.WriteHeader(s.StreamHeaders); err != nil {
82+
if s.Running {
83+
if err := conn.WriteHeader(s.streamHeaders); err != nil {
6784
fmt.Println("can't write header:", err)
6885
// os.Exit(1)
6986
}
@@ -106,51 +123,159 @@ func (s *Session) RemoveDestination(id int) error {
106123
return nil
107124
}
108125

109-
func (s *Session) ChangeState(active bool) {
110-
s.Active = active
111-
}
112-
113126
func (s *Session) SetHeaders(streams []av.CodecData) {
114-
s.StreamHeaders = streams
127+
s.streamHeaders = streams
115128
}
116129

117-
func (s *Session) ForwardPackets() {
118-
s.ChangeState(true)
130+
func (s *Session) Run() {
131+
// Prevent another routine from running
132+
if s.Running {
133+
return
134+
}
119135

120-
// Wait until buffer filled before we start flushing
136+
// TODO: Sessions once ended actually shouldn't be removed
137+
// Reset in case the session is reused
138+
s.lastPacketTime = time.Now()
139+
s.bufferedDuration = time.Duration(0)
140+
s.outgoingDuration = time.Duration(0)
141+
s.incomingDuration = time.Duration(0)
142+
143+
s.Running = true
144+
s.streamStatus = StreamBuffering
145+
log.Println("Session switched to buffering")
146+
147+
// Loop waiting for buffering to finish
121148
for {
122-
if !s.bufferFilled && len(s.buffer) < s.desiredBufferSize {
123-
s.bufferFilled = true
124-
continue
149+
if s.streamStatus == StreamDisconnected {
150+
log.Println("Stopping buffer due to disconnect")
151+
break
125152
}
126153

127-
break
154+
if time.Since(s.lastPacketTime) > time.Second {
155+
log.Printf("Buffered packets: %s/%s Buffered Packets: %d", s.bufferedDuration, s.bufferLength, len(s.buffer))
156+
s.lastPacketTime = time.Now()
157+
}
158+
159+
if s.incomingDuration >= s.bufferLength {
160+
log.Println("Session switched to streaming", len(s.buffer))
161+
s.streamStatus = StreamStreaming
162+
break
163+
}
128164
}
129165

130-
for p := range s.buffer {
166+
if s.streamStatus == StreamStreaming {
167+
log.Println("Finished Buffering")
168+
169+
log.Println("Connecting to destinations")
131170
for _, destination := range s.Destinations {
132-
destination.RTMP.WritePacket(p)
171+
if err := destination.RTMP.WriteHeader(s.streamHeaders); err != nil {
172+
fmt.Println("can't write header to destination stream:", err)
173+
// os.Exit(1)
174+
}
175+
176+
go destination.RTMP.Loop()
177+
}
178+
179+
log.Println("Beginning to stream to destinations")
180+
181+
streamedTime := time.Now()
182+
183+
Loop:
184+
for {
185+
186+
select {
187+
case <-s.stop:
188+
break Loop
189+
case packet := <-s.buffer:
190+
// Use the timing in the packet - the time streamed to figure out when next packet should go out
191+
time.Sleep(packet.Time - s.outgoingDuration)
192+
193+
// Verbose logging just to be able to see the state of the buffer
194+
if time.Since(streamedTime) > time.Second {
195+
log.Printf("Outgoing Packet Time: %s (idx %d); Incoming Packet Time: %s; Buffered up to Time: %s; Buffered Packets: %d", packet.Time, packet.Idx, s.incomingDuration, s.bufferedDuration, len(s.buffer))
196+
197+
streamedTime = time.Now()
198+
}
199+
200+
// Write packet out to all destinations
201+
for _, destination := range s.Destinations {
202+
destination.RTMP.WritePacket(packet)
203+
}
204+
205+
// Update with the time marker that has been already streamed
206+
s.outgoingDuration = packet.Time
207+
s.lastPacketTime = time.Now()
208+
}
209+
210+
if s.streamStatus == StreamDisconnected && len(s.buffer) == 0 {
211+
log.Println("Stream is disconnected and buffer is empty. Sending Stop Signal")
212+
s.stop <- true
213+
}
214+
}
215+
}
216+
217+
log.Println("Disconnecting from destinations")
218+
for _, destination := range s.Destinations {
219+
err := destination.RTMP.Disconnect()
220+
if err != nil {
221+
fmt.Println(err)
133222
}
134223
}
224+
225+
s.Running = false
226+
227+
log.Println("Attempt to self destruct session")
228+
if err := DeleteSession(s.Key); err != nil {
229+
log.Println(err)
230+
}
135231
}
136232

137-
func (s *Session) SetBufferSize(size int) error {
138-
if s.Active {
139-
log.Println("Can't set buffer size while session is active")
233+
func (s *Session) SetBufferSize(packet av.Packet) error {
234+
if s.Running && s.streamStatus != StreamCreated {
235+
log.Println("Can't set Packet size for buffer while session is running")
236+
s.RelayPacket(packet)
140237
return nil
141238
}
142239

143-
s.buffer = make(chan av.Packet, size)
240+
packetsNeeded := s.bufferLength / packet.CompositionTime
241+
size := int(packetsNeeded) * 4
242+
243+
log.Println("Packet size:", packet.CompositionTime)
244+
245+
log.Println("Setting buffer size to:", size)
246+
247+
// Need to figure out how to do this properly again with math
248+
s.buffer = make(chan av.Packet, 100000)
249+
250+
s.RelayPacket(packet)
144251

145252
return nil
146253
}
147254

148255
func (s *Session) RelayPacket(p av.Packet) {
256+
packetDuration := p.Time
257+
258+
if s.discrepancySize+packetDuration < s.previousIncomingDuration {
259+
s.discrepancySize = s.bufferedDuration
260+
log.Println("Discrepancy detected correcting", s.discrepancySize)
261+
}
262+
263+
s.incomingDuration = packetDuration
264+
265+
p.Time = s.discrepancySize + packetDuration
266+
267+
s.bufferedDuration = p.Time
268+
s.previousIncomingDuration = p.Time
269+
149270
s.buffer <- p
150271
}
151272

273+
func (s *Session) StreamDisconnected() {
274+
s.streamStatus = StreamDisconnected
275+
}
276+
152277
func (s *Session) EndSession() {
153-
s.End = true
278+
s.stop <- true
154279
}
155280

156281
func InitializeSessionStore() {
@@ -165,17 +290,21 @@ func CreateSession(sessionPayload models.SessionPayload) error {
165290
}
166291

167292
if existingSession != nil {
168-
return errors.New("Already Exists")
293+
return errors.New("session already Exists")
169294
}
170295

296+
bufferLength := time.Second * time.Duration(sessionPayload.Delay)
297+
171298
session := &Session{
172-
buffer: make(chan av.Packet, 2),
299+
Delay: sessionPayload.Delay,
173300
StreamerID: sessionPayload.StreamerID,
174301
Key: sessionPayload.Key,
175302
Destinations: map[int]*Destination{},
176303
NextDestinationID: 0,
177-
Active: false,
178-
End: false,
304+
Running: false,
305+
buffer: make(chan av.Packet, 1),
306+
stop: make(chan bool, 1),
307+
bufferLength: bufferLength,
179308
}
180309

181310
_sessions[sessionPayload.Key] = session

0 commit comments

Comments
 (0)