Created
March 7, 2024 18:09
-
-
Save spscream/a978129cf3d99bc2d1d5cb8b1eefe944 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule SipTrunk.ParticipantPipeline.UDP.Endpoint do | |
@moduledoc """ | |
Element that reads packets from a UDP socket and sends their payloads through the output pad. | |
And reads from input pad and sends through the same socket | |
""" | |
use Membrane.Endpoint | |
import Mockery.Macro | |
alias Membrane.{Buffer, RemoteStream} | |
alias Membrane.UDP.Socket | |
require Membrane.Logger | |
def_options local_port_no: [ | |
spec: pos_integer(), | |
default: 5000, | |
description: "A UDP port number used when opening a receiving socket." | |
], | |
local_address: [ | |
spec: :inet.socket_address(), | |
default: :any, | |
description: """ | |
An IP Address on which the socket will listen. It allows to choose which | |
network interface to use if there's more than one. | |
""" | |
], | |
destination_address: [ | |
spec: :inet.ip_address(), | |
description: "An IP Address that the packets will be sent to." | |
], | |
destination_port_no: [ | |
spec: :inet.port_number(), | |
description: "A UDP port number of a target." | |
], | |
recv_buffer_size: [ | |
spec: pos_integer(), | |
default: 1024 * 1024, | |
description: """ | |
Size of the receive buffer. Packages of size greater than this buffer will be truncated | |
""" | |
], | |
pierce_nat_ctx: [ | |
spec: | |
%{ | |
uri: URI.t(), | |
address: :inet.ip_address(), | |
port: pos_integer() | |
} | |
| nil, | |
default: nil, | |
description: """ | |
Context necessary to make an attempt to create client-side NAT binding | |
by sending an empty datagram from the `#{inspect(__MODULE__)}` to an arbitrary host. | |
* If left as `nil`, no attempt will ever be made. | |
* If filled in, whenever the pipeline switches playback to `:playing`, | |
one datagram (with an empty payload) will be sent from the opened socket | |
to the `:port` at `:address` provided via this option. | |
If `:address` is not present, it will be parsed from `:uri`. | |
Disclaimer: This is a potential vulnerability. Use with caution. | |
""" | |
] | |
def_input_pad :input, | |
accepted_format: _any, | |
availability: :on_request, | |
flow_control: :manual, | |
demand_unit: :buffers | |
def_output_pad :output, | |
accepted_format: %RemoteStream{type: :packetized}, | |
flow_control: :push | |
@impl true | |
def handle_init(_context, %__MODULE__{} = opts) do | |
state = %{ | |
local_socket: %Socket{ | |
ip_address: opts.local_address, | |
port_no: opts.local_port_no, | |
sock_opts: [recbuf: opts.recv_buffer_size] | |
}, | |
dst_socket: %Socket{ | |
ip_address: opts.destination_address, | |
port_no: opts.destination_port_no | |
}, | |
pierce_nat_ctx: opts.pierce_nat_ctx | |
} | |
{[], state} | |
end | |
@impl true | |
def handle_pad_added(Pad.ref(:input, _ref) = pad, _ctx, state) do | |
{[demand: pad], state} | |
end | |
@impl true | |
def handle_playing(_ctx, %{pierce_nat_ctx: nil} = state) do | |
{[stream_format: {:output, %RemoteStream{type: :packetized}}], state} | |
end | |
@impl true | |
def handle_playing(_ctx, %{pierce_nat_ctx: nat_ctx} = state) do | |
ip = | |
if is_nil(Map.get(nat_ctx, :address)), | |
do: parse_address(nat_ctx.uri), | |
else: nat_ctx.address | |
nat_ctx = Map.put(nat_ctx, :address, ip) | |
Socket.send(%Socket{ip_address: ip, port_no: nat_ctx.port}, state.local_socket, <<>>) | |
{[stream_format: {:output, %RemoteStream{type: :packetized}}], | |
%{state | pierce_nat_ctx: nat_ctx}} | |
end | |
@impl true | |
def handle_buffer(Pad.ref(:input, _ref) = pad, %Buffer{payload: payload}, _context, state) do | |
%{dst_socket: dst_socket, local_socket: local_socket} = state | |
case mockable(Socket).send(dst_socket, local_socket, payload) do | |
:ok -> {[demand: pad], state} | |
{:error, :eagain} -> | |
Membrane.Logger.warning("Get eagain on sending of UDP packet") | |
{[demand: pad], state} | |
{:error, cause} -> raise "Error sending UDP packet, reason: #{inspect(cause)}" | |
end | |
end | |
@impl true | |
def handle_parent_notification( | |
{:udp, _socket_handle, _addr, _port_no, _payload} = meta, | |
ctx, | |
state | |
) do | |
handle_info(meta, ctx, state) | |
end | |
@impl true | |
def handle_info( | |
{:udp, _socket_handle, address, port_no, payload}, | |
%{playback: :playing}, | |
state | |
) do | |
metadata = | |
Map.new() | |
|> Map.put(:udp_source_address, address) | |
|> Map.put(:udp_source_port, port_no) | |
|> Map.put(:arrival_ts, Membrane.Time.vm_time()) | |
actions = [buffer: {:output, %Buffer{payload: payload, metadata: metadata}}] | |
{actions, state} | |
end | |
@impl true | |
def handle_info( | |
{:udp, _socket_handle, _address, _port_no, _payload}, | |
_ctx, | |
state | |
) do | |
{[], state} | |
end | |
@impl true | |
def handle_setup(ctx, %{local_socket: %Socket{} = local_socket} = state) do | |
case mockable(Socket).open(local_socket) do | |
{:ok, socket} -> | |
notification = {:connection_info, socket.ip_address, socket.port_no} | |
Membrane.ResourceGuard.register( | |
ctx.resource_guard, | |
fn -> close_socket(socket) end, | |
tag: :udp_guard | |
) | |
{[notify_parent: notification], %{state | local_socket: socket}} | |
{:error, reason} -> | |
raise "Error opening UDP socket, reason: #{inspect(reason)}" | |
end | |
end | |
defp close_socket(%Socket{} = local_socket) do | |
mockable(Socket).close(local_socket) | |
end | |
defp parse_address(uri) do | |
hostname = | |
URI.parse(uri) | |
|> Map.get(:host) | |
|> to_charlist() | |
Enum.find_value([:inet, :inet6, :local], fn addr_family -> | |
case :inet.getaddr(hostname, addr_family) do | |
{:ok, address} -> address | |
{:error, _reason} -> false | |
end | |
end) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment