Created
July 16, 2024 18:40
-
-
Save TwistingTwists/c4ba1edf72abcf9316eb0ee0e61edd42 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule RazorNewWeb.S3Writer do | |
@moduledoc """ | |
Module to stream video directly to S3 bucket. | |
But not via presigned url. | |
""" | |
use TypedStruct | |
@typedoc "S3Writer struct" | |
typedstruct do | |
field(:filename, String.t(), enforce: true) | |
field(:s3_config, ExAws.Config.t(), enforce: true) | |
field(:s3_init_op, ExAws.Operation.S3.t(), default: nil) | |
field(:s3_upload_op, ExAwsExAws.S3.Upload.t(), default: nil) | |
# start chunk number from 0 so that first chunk (zero-th chunk) is consumed in intializing request | |
# rest chunks can be numbered correctly, starting 1 which is how ex_aws_s3 expects chunks to be. | |
field(:part_number, non_neg_integer(), default: 0) | |
field(:parts, List.t(), default: []) | |
field(:accumulated_chunk, binary(), default: <<>>) | |
end | |
@behaviour Phoenix.LiveView.UploadWriter | |
alias ExAws.S3 | |
require Logger | |
@impl Phoenix.LiveView.UploadWriter | |
def init(opts) do | |
Logger.info("inside s3 writer now") | |
{s3_config, rest} = Keyword.pop(opts, :s3_config, ExAws.Config.new(:s3)) | |
file_name = Keyword.fetch!(rest, :filename) | |
state = %__MODULE__{ | |
filename: file_name, | |
s3_config: s3_config | |
} | |
state = setup_s3_upload_state(state) | |
{:ok, state} | |
end | |
defp setup_s3_upload_state(state) do | |
s3_upload_op = | |
ExAws.S3.upload([], state.s3_config.bucket, state.filename) | |
s3_upload_op_with_upload_id = | |
case ExAws.S3.Upload.initialize(s3_upload_op, state.s3_config) do | |
{:ok, s3_upload_op_with_upload_id} -> s3_upload_op_with_upload_id | |
_ -> raise "Could not intiate upload to the file" | |
end | |
# set timeout to 3 min for slow connections | |
s3_upload_op_with_upload_id = %{ | |
s3_upload_op_with_upload_id | |
| opts: Keyword.merge([timeout: 180 * 1000], s3_upload_op_with_upload_id.opts) | |
} | |
%{ | |
state | |
| part_number: state.part_number + 1, | |
s3_upload_op: s3_upload_op_with_upload_id | |
} | |
end | |
@impl Phoenix.LiveView.UploadWriter | |
def meta(state) do | |
Map.take(state, [:filename, :s3_config, :part_number]) | |
end | |
@impl Phoenix.LiveView.UploadWriter | |
def write_chunk(data, state) do | |
part_number = state.part_number | |
{accumulated_chunk, new_state} = accumulate_chunk(data, state) | |
if byte_size(accumulated_chunk) >= 5 * 1024 * 1024 do | |
upload_chunk(accumulated_chunk, new_state) | |
else | |
{:ok, new_state} | |
end | |
end | |
defp accumulate_chunk(data, state) do | |
new_accumulated_chunk = state.accumulated_chunk <> data | |
new_state = %{state | accumulated_chunk: new_accumulated_chunk} | |
{new_accumulated_chunk, new_state} | |
end | |
defp upload_chunk(data, state) do | |
part_number = state.part_number | |
case ExAws.S3.Upload.upload_chunk( | |
{data, part_number}, | |
Map.delete(state.s3_upload_op, :src), | |
state.s3_config | |
) do | |
{^part_number, _etag} = part -> | |
{:ok, | |
%{ | |
state | |
| part_number: part_number + 1, | |
parts: [part | state.parts], | |
accumulated_chunk: <<>> | |
}} | |
{:error, reason} -> | |
{:error, reason} | |
end | |
end | |
@impl Phoenix.LiveView.UploadWriter | |
def close(state, reason) do | |
case ExAws.S3.Upload.complete(state.parts, state.s3_upload_op, state.s3_config) do | |
{:ok, _} -> | |
Logger.warning("[complete] S3 Upload #{inspect(state.s3_config)}") | |
{:ok, state} | |
{:error, reason} -> | |
Logger.warning("[NOT complete] S3 Upload #{inspect(state.s3_config)}") | |
{:error, reason, state} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment