Skip to content

Commit c733abf

Browse files
committed
Add a manifest to files that are checked to confirm they’ve been uploaded successfully.
1 parent c0f0530 commit c733abf

File tree

2 files changed

+68
-34
lines changed

2 files changed

+68
-34
lines changed

lib/ex_webrtc_recorder/s3.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ defmodule ExWebRTC.Recorder.S3 do
2525
* `:bucket_name` (required) - Name of bucket objects will be uploaded to.
2626
* `:base_path` - S3 path prefix used for objects uploaded to the bucket. `""` by default.
2727
"""
28-
@type upload_option :: {:bucket_name, String.t()} | {:base_path, String.t()}
28+
@type upload_option ::
29+
{:bucket_name, String.t()} | {:base_path, String.t()} | {:upload_manifest, boolean()}
2930

3031
@type upload_config :: [upload_option() | override_option()]
3132

lib/ex_webrtc_recorder/s3/upload_handler.ex

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ if Code.ensure_loaded?(ExAws.S3) do
1414
s3_config_overrides: keyword(),
1515
bucket_name: String.t(),
1616
base_path: Path.t(),
17-
tasks: %{ref() => manifest()}
17+
tasks: %{ref() => manifest()},
18+
upload_manifest: boolean()
1819
}
1920

20-
@enforce_keys [:s3_config_overrides, :bucket_name, :base_path]
21+
@enforce_keys [:s3_config_overrides, :bucket_name, :base_path, :upload_manifest]
2122
defstruct @enforce_keys ++ [tasks: %{}]
2223

2324
@spec new(keyword()) :: t()
@@ -27,24 +28,32 @@ if Code.ensure_loaded?(ExAws.S3) do
2728

2829
base_path = Keyword.get(config, :base_path, "")
2930
{:ok, _test_path} = base_path |> Path.join("a") |> Recorder.S3.Utils.validate_s3_path()
31+
upload_manifest = Keyword.get(config, :upload_manifest, false)
32+
3033
s3_config_overrides = Keyword.drop(config, [:bucket_name, :base_path])
3134

3235
%__MODULE__{
3336
bucket_name: bucket_name,
3437
base_path: base_path,
38+
upload_manifest: upload_manifest,
3539
s3_config_overrides: s3_config_overrides
3640
}
3741
end
3842

3943
@spec spawn_task(t(), manifest()) :: {ref(), t()}
4044
def spawn_task(
41-
%__MODULE__{bucket_name: bucket_name, s3_config_overrides: s3_config_overrides} =
45+
%__MODULE__{
46+
bucket_name: bucket_name,
47+
base_path: base_path,
48+
upload_manifest: upload_manifest,
49+
s3_config_overrides: s3_config_overrides
50+
} =
4251
handler,
4352
manifest
4453
) do
4554
s3_paths =
4655
Map.new(manifest, fn {id, %{location: path}} ->
47-
s3_path = path |> Path.basename() |> then(&Path.join(handler.base_path, &1))
56+
s3_path = path |> Path.basename() |> then(&Path.join(base_path, &1))
4857

4958
{id, s3_path}
5059
end)
@@ -56,21 +65,19 @@ if Code.ensure_loaded?(ExAws.S3) do
5665
{id, %{object_data | location: location}}
5766
end)
5867

59-
manifest_s3_path =
60-
Path.join(handler.base_path, "manifest.json")
61-
62-
Recorder.S3.Utils.upload_manifest(
63-
download_manifest,
64-
bucket_name,
65-
manifest_s3_path,
66-
s3_config_overrides
67-
)
68-
6968
# FIXME: this links, ideally we should use `async_nolink` instead
7069
# but this may require a slight change of the current UploadHandler logic
7170
task =
7271
Task.Supervisor.async(ExWebRTC.Recorder.TaskSupervisor, fn ->
73-
upload(manifest, bucket_name, s3_paths, s3_config_overrides)
72+
upload(
73+
manifest,
74+
bucket_name,
75+
base_path,
76+
s3_paths,
77+
s3_config_overrides,
78+
upload_manifest,
79+
download_manifest
80+
)
7481
end)
7582

7683
{task.ref,
@@ -105,28 +112,54 @@ if Code.ensure_loaded?(ExAws.S3) do
105112
{result, manifest, %__MODULE__{handler | tasks: tasks}}
106113
end
107114

108-
defp upload(manifest, bucket_name, s3_paths, s3_config_overrides) do
109-
Map.new(manifest, fn {id, %{location: path}} ->
110-
%{^id => s3_path} = s3_paths
111-
Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`")
115+
defp upload(
116+
manifest,
117+
bucket_name,
118+
base_path,
119+
s3_paths,
120+
s3_config_overrides,
121+
upload_manifest,
122+
download_manifest
123+
) do
124+
results =
125+
Map.new(manifest, fn {id, %{location: path}} ->
126+
%{^id => s3_path} = s3_paths
127+
Logger.debug("Uploading `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`")
112128

113-
result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides)
129+
result = Recorder.S3.Utils.upload_file(path, bucket_name, s3_path, s3_config_overrides)
114130

115-
case result do
116-
{:ok, _output} ->
117-
Logger.debug(
118-
"Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`"
119-
)
131+
case result do
132+
{:ok, _output} ->
133+
Logger.debug(
134+
"Successfully uploaded `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}`"
135+
)
120136

121-
{:error, reason} ->
122-
Logger.warning("""
123-
Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \
124-
failed with reason #{inspect(reason)}\
125-
""")
126-
end
137+
{:error, reason} ->
138+
Logger.warning("""
139+
Upload of `#{path}` to bucket `#{bucket_name}`, path `#{s3_path}` \
140+
failed with reason #{inspect(reason)}\
141+
""")
142+
end
143+
144+
{id, result}
145+
end)
127146

128-
{id, result}
129-
end)
147+
if upload_manifest do
148+
manifest_s3_path =
149+
Path.join(base_path, "manifest.json")
150+
151+
manifest_result =
152+
Recorder.S3.Utils.upload_manifest(
153+
download_manifest,
154+
bucket_name,
155+
manifest_s3_path,
156+
s3_config_overrides
157+
)
158+
159+
Map.put(results, :manifest, manifest_result)
160+
else
161+
results
162+
end
130163
end
131164
end
132165
else

0 commit comments

Comments
 (0)