Skip to content

Instantly share code, notes, and snippets.

@lawik
Created January 16, 2024 14:00
Show Gist options
  • Save lawik/1314aae3f13025bc38e5d894700b9181 to your computer and use it in GitHub Desktop.
Save lawik/1314aae3f13025bc38e5d894700b9181 to your computer and use it in GitHub Desktop.
Membrane live streaming, old stream files
defmodule UnderjordWeb.LiveLive do
use UnderjordWeb, :live_view
@impl true
def mount(%{"name" => name}, session, socket) do
index_path = Path.join(["/livestream", name, "index.m3u8"])
local_path = Path.join("priv/static/", index_path)
started? = case File.stat(local_path) do
{:ok, _} -> true
_ -> false
end
Phoenix.PubSub.subscribe(Underjord.PubSub, "livestream:#{name}")
{:ok, assign(socket,
name: name,
index_path: index_path,
type_body_tag: "video",
immersive?: true,
live?: false,
started?: started?
)}
end
@impl true
def handle_info(:started, socket) do
{:noreply, assign(socket, started?: true, live?: true)}
end
@impl true
def handle_info(:ended, socket) do
{:noreply, assign(socket, live?: false, started?: true)}
end
def render(assigns) do
~H"""
<%= if @started? do %>
<video id="video" phx-hook="LiveVideoAdded" data-path={Routes.static_path(@socket, @index_path)} controls class="max-h-screen max-w-screen mx-auto"></video>
<%= if @live? do %>
<div class="absolute bottom-2 right-2 color-u-pink">
Live!
</div>
<% end %>
<% else %>
<p class="mt-8 w-1/2 mx-auto text-center">Nothing yet, please hold on...</p>
<% end %>
"""
end
end
defmodule Underjord.LiveStream do
use Membrane.Pipeline
require Logger
alias Membrane.Element.Tee
alias Membrane.Audiometer.Peakmeter
alias Membrane.Element.Fake
@impl true
def handle_init(%{name: name, port: port}) do
path = Path.join("priv/static/livestream", name)
:pg.join("open-livestreams", self())
File.mkdir_p!(path)
children = %{
rtmp_server: %Membrane.RTMP.Bin{port: port},
# tee: Tee.Master,
# audio_sink: Fake.Sink.Buffers,
# aac_decoder: Membrane.AAC.FDK.Decoder,
# converter: %Membrane.FFmpeg.SWResample.Converter{
# output_caps: %Membrane.Caps.Audio.Raw{
# format: :s16le,
# sample_rate: 48000,
# channels: 2
# }
# },
hls: %Membrane.HTTPAdaptiveStream.SinkBin{
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: :infinity,
muxer_segment_duration: 5 |> Membrane.Time.seconds(),
persist?: true,
storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: path}
}
}
links = [
link(:rtmp_server)
|> via_out(:audio)
|> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC])
|> to(:hls),
#|> to(:tee),
# link(:tee)
# |> via_out(:master)
# |> via_in(Pad.ref(:input, :audio), options: [encoding: :AAC])
# |> to(:hls),
# link(:tee)
# |> via_out(:copy)
# |> to(:aac_decoder)
# |> to(:converter)
# |> to(:audio_sink),
link(:rtmp_server)
|> via_out(:video)
|> via_in(Pad.ref(:input, :video), options: [encoding: :H264])
|> to(:hls)
]
spec = %ParentSpec{children: children, links: links}
Phoenix.PubSub.broadcast!(Underjord.PubSub, "livestreams", {:ready, name})
{{:ok, spec: spec}, %{name: name}}
end
@impl true
def handle_notification({:track_playable, ref}, :hls, _, state) do
started = state
|> Map.get(:started, %{})
|> Map.put(ref, true)
if Enum.count(started) >= 2 do
Phoenix.PubSub.broadcast!(Underjord.PubSub, "livestream:#{state.name}", :started)
Phoenix.PubSub.broadcast!(Underjord.PubSub, "livestreams", {:started, state.name})
end
{:ok, Map.put(state, :started, started)}
end
@impl true
def handle_notification(:end_of_stream, :hls, _, state) do
ended = Map.get(state, :ended, [])
ended = [true | ended]
if Enum.count(ended) >= 2 do
Phoenix.PubSub.broadcast!(Underjord.PubSub, "livestream:#{state.name}", :ended)
Phoenix.PubSub.broadcast!(Underjord.PubSub, "livestreams", {:ended, state.name})
end
{:ok, Map.put(state, :ended, ended)}
end
@impl true
def handle_notification(msg, element, context, state) do
#IO.inspect({msg, element, context, state}, label: "notification")
{:ok, state}
end
end
defmodule UnderjordWeb.LivestreamsLive do
use UnderjordWeb, :live_view
@impl true
def mount(_params, _session, socket) do
Phoenix.PubSub.subscribe(Underjord.PubSub, "livestreams")
{:ok, assign(socket,
started: list_started(),
open: list_open()
)}
end
@impl true
def handle_event("create", %{"name" => name, "port" => port}, socket) do
Underjord.go_live(name, String.to_integer(port))
{:noreply, socket}
end
@impl true
def handle_info({:ready, _}, socket) do
{:noreply, assign(socket, started: list_started(), open: list_open())}
end
@impl true
def handle_info({:started, _}, socket) do
{:noreply, assign(socket, started: list_started(), open: list_open())}
end
@impl true
def handle_info({:ended, _}, socket) do
{:noreply, assign(socket, started: list_started(), open: list_open())}
end
defp list_started do
livestreams_path = "priv/static/livestream"
started = livestreams_path
|> File.ls!()
|> Enum.filter(fn folder_name ->
:ok == livestreams_path
|> Path.join(folder_name)
|> Path.join("index.m3u8")
|> File.stat()
|> elem(0)
end)
|> Enum.sort()
end
defp list_open do
"open-livestreams"
|> :pg.get_members()
|> Enum.map(fn pid ->
pid
|> :sys.get_state()
|> get_in([:internal_state, :name])
end)
end
def render(assigns) do
~H"""
<form phx-submit="create" phx-validate="validate" class="m-4">
<label class="m-4">
Name
<input type="text" name="name" id="name" class="ml-2 mr-4 p-2 px-4 text-u-white bg-u-black border-u-cyan border rounded-md" />
</label>
<label class="m-4">
Port
<input type="text" name="port" id="port" value="5000" class="ml-2 mr-4 p-2 px-4 text-u-white bg-u-black border-u-magenta border rounded-md" />
</label>
<input type="submit" value="Create" class="p-2 px-4 text-u-black bg-u-purple rounded-md" />
</form>
<h2>Running</h2>
<ul class="mx-8">
<%= for stream_name <- @open do %>
<li class="my-2"><a href={Routes.live_path(@socket, UnderjordWeb.LiveLive, stream_name)}>
<%= stream_name %>
</a>
</li>
<% end %>
</ul>
<h2>Existing</h2>
<ul class="mx-8">
<%= for stream_name <- @started do %>
<li class="my-2"><a href={Routes.live_path(@socket, UnderjordWeb.LiveLive, stream_name)}>
<%= stream_name %>
</a>
</li>
<% end %>
</ul>
"""
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment