Skip to content

Commit 9aa1ce9

Browse files
committed
wip4
1 parent 4130278 commit 9aa1ce9

File tree

2 files changed

+103
-104
lines changed

2 files changed

+103
-104
lines changed

lib/ex_webrtc_recorder/converter.ex

Lines changed: 94 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ defmodule ExWebRTC.Recorder.Converter do
188188

189189
recorder_manifest
190190
|> fetch_remote_files!(download_path, download_config)
191-
|> do_convert_manifest!(
191+
|> convert_manifest!(
192192
output_path,
193193
thumbnails_ctx,
194194
rid_allowed?,
@@ -275,7 +275,7 @@ defmodule ExWebRTC.Recorder.Converter do
275275
#
276276
# Conversion
277277

278-
defp do_convert_manifest!(
278+
defp convert_manifest!(
279279
manifest,
280280
output_path,
281281
thumbnails_ctx,
@@ -287,131 +287,110 @@ defmodule ExWebRTC.Recorder.Converter do
287287
# 1. Read tracks
288288
# 2. Convert tracks to WEBM files
289289
# 3. Mux WEBM files into a single file
290-
stream_map =
291-
Enum.reduce(manifest, %{}, fn {_id, track}, stream_map ->
292-
%{
293-
location: path,
294-
kind: kind,
295-
streams: streams,
296-
rid_map: rid_map
297-
} = track
298-
299-
file =
300-
with {:ok, %File.Stat{size: s}} <- File.stat(path),
301-
true <- s > 0,
302-
{:ok, file} <- File.open(path) do
303-
file
304-
else
305-
false ->
306-
raise "File #{path} is empty!"
290+
Enum.reduce(manifest, %{}, fn {_id, track}, stream_map ->
291+
%{
292+
location: path,
293+
kind: kind,
294+
streams: streams,
295+
rid_map: rid_map
296+
} = track
297+
298+
file =
299+
with {:ok, %File.Stat{size: s}} <- File.stat(path),
300+
true <- s > 0,
301+
{:ok, file} <- File.open(path) do
302+
file
303+
else
304+
false ->
305+
raise "File #{path} is empty!"
306+
307+
{:error, reason} ->
308+
raise "Unable to open #{path}: #{inspect(reason)}"
309+
end
307310

308-
{:error, reason} ->
309-
raise "Unable to open #{path}: #{inspect(reason)}"
310-
end
311+
packets =
312+
read_packets(
313+
file,
314+
Map.new(rid_map, fn {_rid, rid_idx} ->
315+
{rid_idx, %{store: %PacketStore{}, acc: [], packets_in_store: 0}}
316+
end),
317+
reorder_buffer_size
318+
)
311319

312-
packets =
313-
read_packets(
314-
file,
315-
Map.new(rid_map, fn {_rid, rid_idx} ->
316-
{rid_idx, %{store: %PacketStore{}, acc: [], packets_in_store: 0}}
317-
end),
318-
reorder_buffer_size
319-
)
320-
321-
track_contexts =
322-
case kind do
323-
:video ->
324-
rid_map = filter_rids(rid_map, rid_allowed?)
325-
get_video_track_contexts(rid_map, packets)
326-
327-
:audio ->
328-
get_audio_track_context(packets)
329-
end
320+
track_contexts =
321+
case kind do
322+
:video ->
323+
rid_map = filter_rids(rid_map, rid_allowed?)
324+
get_video_track_contexts(rid_map, packets)
330325

331-
stream_id = List.first(streams)
332-
333-
stream_map
334-
|> Map.put_new(stream_id, %{video: %{}, audio: %{}})
335-
|> Map.update!(stream_id, &Map.put(&1, kind, track_contexts))
336-
end)
337-
338-
Enum.flat_map(stream_map, fn {stream_id, %{video: v, audio: a}} ->
339-
cond do
340-
map_size(v) == 0 and map_size(a) == 0 ->
341-
raise "Stream #{stream_id} contains no tracks!"
342-
343-
map_size(v) == 0 ->
344-
%{nil => audio_ctx} = a
345-
output_file = Path.join(output_path, "#{stream_id}.webm")
346-
output_file |> Path.dirname() |> File.mkdir_p!()
347-
[{stream_id, process!(nil, audio_ctx, output_file, nil, nil)}]
348-
349-
true ->
350-
for {rid, video_ctx} <- v do
351-
output_id = if rid == nil, do: stream_id, else: "#{stream_id}_#{rid}"
352-
output_file = Path.join(output_path, "#{output_id}.webm")
353-
output_file |> Path.dirname() |> File.mkdir_p!()
354-
{output_id, process!(video_ctx, a[nil], output_file, thumbnails_ctx, reencode_ctx)}
355-
end
356-
end
326+
:audio ->
327+
get_audio_track_context(packets)
328+
end
329+
330+
stream_id = List.first(streams)
331+
332+
stream_map
333+
|> Map.put_new(stream_id, %{video: %{}, audio: %{}})
334+
|> Map.update!(stream_id, &Map.put(&1, kind, track_contexts))
357335
end)
336+
|> Enum.flat_map(&convert_stream!(&1, output_path, thumbnails_ctx, reencode_ctx))
358337
|> Map.new()
359338
end
360339

361-
defp process!(video_ctx, audio_ctx, output_file, thumbnails_ctx, reencode_ctx) do
362-
video_stream = if video_ctx, do: make_stream(self(), :video)
363-
audio_stream = if audio_ctx, do: make_stream(self(), :audio)
340+
defp convert_stream!({stream_id, %{video: video_ctxs, audio: audio_ctxs}}, output_path, thumbnails_ctx, reencode_ctx) do
341+
video_ctxs = if map_size(video_ctxs) == 0, do: %{nil: nil}, else: video_ctxs
364342

365-
# FIXME: Possible RC here: when the pipeline playback starts, the `Stream`s will start sending
366-
# `{:demand, kind, self()}` messages to this process.
367-
# There's no guarantee we'll start listening for these messages (in `emit_packets/3`) soon enough,
368-
# so we may reach a deadlock.
369-
# For now, it seems to work just fine, though.
370-
{:ok, _sup, pid} = Pipeline.start_link(video_stream, audio_stream, output_file)
371-
Process.monitor(pid)
343+
Enum.map(video_ctxs, fn {rid, video_ctx} ->
344+
output_id = if rid == nil, do: stream_id, else: "#{stream_id}_#{rid}"
372345

373-
emit_packets(pid, video_ctx[:packets] || [], audio_ctx[:packets] || [])
346+
output_file = Path.join(output_path, "#{output_id}.webm")
347+
output_file |> Path.dirname() |> File.mkdir_p!()
348+
349+
audio_ctx = audio_ctxs[nil]
350+
if video_ctx == nil and audio_ctx == nil, do: raise "Stream #{stream_id} contains no tracks!"
351+
352+
{output_id, convert_file!(video_ctx, audio_ctx, output_file, thumbnails_ctx, reencode_ctx)}
353+
end)
354+
end
355+
356+
defp convert_file!(video_ctx, audio_ctx, output_file, thumbnails_ctx, reencode_ctx) do
357+
{video_file, audio_file} = run_pipeline(video_ctx, audio_ctx, output_file)
374358

375359
cond do
376360
video_ctx != nil and audio_ctx != nil ->
377361
FFmpeg.combine_audio_video!(
378-
output_file <> "_video.webm",
362+
video_file,
379363
video_ctx.start_time,
380-
output_file <> "_audio.webm",
364+
audio_file,
381365
audio_ctx.start_time,
382366
output_file,
383367
reencode_ctx
384368
)
385369

386-
File.rm!(output_file <> "_video.webm")
387-
File.rm!(output_file <> "_audio.webm")
388-
389370
video_ctx != nil ->
390371
if reencode_ctx,
391-
do: FFmpeg.reencode_video!(output_file <> "_video.webm", output_file, reencode_ctx),
392-
else: File.cp!(output_file <> "_video.webm", output_file)
393-
394-
File.rm!(output_file <> "_video.webm")
372+
do: FFmpeg.reencode_video!(video_file, output_file, reencode_ctx),
373+
else: File.cp!(video_file, output_file)
395374

396375
true ->
397-
File.cp!(output_file <> "_audio.webm", output_file)
398-
File.rm!(output_file <> "_audio.webm")
376+
File.cp!(audio_file, output_file)
399377
end
400378

401-
stream_manifest = %{
379+
File.rm(video_file)
380+
File.rm(audio_file)
381+
382+
%{
402383
location: output_file,
403384
duration_seconds: FFmpeg.get_duration_in_seconds!(output_file)
404385
}
386+
|> maybe_generate_thumbnail!(output_file, thumbnails_ctx)
387+
end
405388

406-
stream_manifest =
407-
if thumbnails_ctx do
408-
thumbnail_file = FFmpeg.generate_thumbnail!(output_file, thumbnails_ctx)
409-
Map.put(stream_manifest, :thumbnail_location, thumbnail_file)
410-
else
411-
stream_manifest
412-
end
413-
414-
stream_manifest
389+
defp maybe_generate_thumbnail!(stream_manifest, _file, nil), do: stream_manifest
390+
defp maybe_generate_thumbnail!(stream_manifest, file, ctx) do
391+
file
392+
|> FFmpeg.generate_thumbnail!(ctx)
393+
|> then(&Map.put(stream_manifest, :thumbnail_location, &1))
415394
end
416395

417396
#
@@ -532,6 +511,23 @@ defmodule ExWebRTC.Recorder.Converter do
532511
#
533512
# Passing packets to the `Pipeline` process
534513

514+
defp run_pipeline(video_ctx, audio_ctx, output_file) do
515+
video_stream = if video_ctx, do: make_stream(self(), :video)
516+
audio_stream = if audio_ctx, do: make_stream(self(), :audio)
517+
518+
# FIXME: Possible RC here: when the pipeline playback starts, the `Stream`s will start sending
519+
# `{:demand, kind, self()}` messages to this process.
520+
# There's no guarantee we'll start listening for these messages (in `emit_packets/3`) soon enough,
521+
# so we may reach a deadlock.
522+
# For now, it seems to work just fine, though.
523+
{:ok, _sup, pid} = Pipeline.start_link(video_stream, audio_stream, output_file)
524+
Process.monitor(pid)
525+
526+
emit_packets(pid, video_ctx[:packets] || [], audio_ctx[:packets] || [])
527+
528+
{Pipeline.video_output(output_file), Pipeline.audio_output(output_file)}
529+
end
530+
535531
defp make_stream(pid, kind) do
536532
Stream.resource(
537533
fn -> pid end,

lib/ex_webrtc_recorder/converter/pipeline.ex

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,16 @@ defmodule ExWebRTC.Recorder.Converter.Pipeline do
5959
})
6060
end
6161

62+
def video_output(path), do: path <> "_video.webm"
63+
def audio_output(path), do: path <> "_audio.webm"
64+
6265
@impl true
6366
def handle_init(_ctx, opts) do
6467
# TODO: Support codecs other than VP8/Opus
6568
# TODO: Use a single muxer + sink once `Membrane.Matroska.Muxer` supports synchronizing AV
6669
spec =
67-
video_spec(opts.video_stream, opts.output_path <> "_video.webm") ++
68-
audio_spec(opts.audio_stream, opts.output_path <> "_audio.webm")
70+
video_spec(opts.video_stream, opts.output_path) ++
71+
audio_spec(opts.audio_stream, opts.output_path)
6972

7073
{[spec: spec], %{sinks_total: length(spec)}}
7174
end
@@ -89,21 +92,21 @@ defmodule ExWebRTC.Recorder.Converter.Pipeline do
8992

9093
defp video_spec(nil, _), do: []
9194

92-
defp video_spec(stream, location) do
95+
defp video_spec(stream, path) do
9396
[
9497
child(:video_source, %Source{stream: stream})
9598
|> child(:video_depayloader, %Membrane.RTP.DepayloaderBin{
9699
clock_rate: 90_000,
97100
depayloader: Membrane.RTP.VP8.Depayloader
98101
})
99102
|> child(:video_muxer, Membrane.Matroska.Muxer)
100-
|> child(:video_sink, %Membrane.File.Sink{location: location})
103+
|> child(:video_sink, %Membrane.File.Sink{location: video_output(path)})
101104
]
102105
end
103106

104107
defp audio_spec(nil, _), do: []
105108

106-
defp audio_spec(stream, location) do
109+
defp audio_spec(stream, path) do
107110
[
108111
child(:audio_source, %Source{stream: stream})
109112
|> child(:audio_depayloader, %Membrane.RTP.DepayloaderBin{
@@ -112,7 +115,7 @@ defmodule ExWebRTC.Recorder.Converter.Pipeline do
112115
})
113116
|> child(:opus_parser, Membrane.Opus.Parser)
114117
|> child(:audio_muxer, Membrane.Matroska.Muxer)
115-
|> child(:audio_sink, %Membrane.File.Sink{location: location})
118+
|> child(:audio_sink, %Membrane.File.Sink{location: audio_output(path)})
116119
]
117120
end
118121
end

0 commit comments

Comments
 (0)