Skip to content

Instantly share code, notes, and snippets.

@Object905
Created March 20, 2026 15:35
Show Gist options
  • Select an option

  • Save Object905/416e5bd31e58992f05217a027b4bef9a to your computer and use it in GitHub Desktop.

Select an option

Save Object905/416e5bd31e58992f05217a027b4bef9a to your computer and use it in GitHub Desktop.
use active_call::media::engine::StreamEngine;
use active_call::media::processor::Processor;
use active_call::media::track::TrackPacketSender;
use active_call::media::{AudioFrame, Samples, TrackId};
use anyhow::Result;
use axum::body::Body;
use axum::extract::Path;
use axum::response::{IntoResponse, Response};
use bytes::Bytes;
use flate2::Compression;
use flate2::write::GzEncoder;
use futures::FutureExt;
use reqwest::StatusCode;
use std::io::{self, Write as _};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::LazyLock;
use tokio_util::sync::CancellationToken;
use active_call::CallOption;
use active_call::event::EventSender;
// 20ms frame at 16kHz per channel
const FRAME_SAMPLES: usize = 320;
#[derive(Clone, Copy)]
enum ListenChannel {
/// Left channel — the tracked participant's own audio
Left,
/// Right channel — server-side-track
Right,
}
#[derive(Clone)]
struct PcmFrame {
channel: ListenChannel,
samples: Vec<i16>,
}
static LISTENED_CALLS: LazyLock<
papaya::HashMap<TrackId, (tokio::sync::broadcast::Sender<PcmFrame>, EventSender)>,
> = LazyLock::new(|| papaya::HashMap::default());
struct RealTimeStreamProcessor {
track_id: TrackId,
sender: tokio::sync::broadcast::Sender<PcmFrame>,
channel: ListenChannel,
}
impl RealTimeStreamProcessor {
fn new(track_id: TrackId, event_sender: EventSender, channel: ListenChannel) -> Self {
let (sender, _) = tokio::sync::broadcast::channel(64);
LISTENED_CALLS
.pin()
.insert(track_id.clone(), (sender.clone(), event_sender));
Self::from_sender(track_id, sender, channel)
}
fn from_sender(
track_id: TrackId,
sender: tokio::sync::broadcast::Sender<PcmFrame>,
channel: ListenChannel,
) -> Self {
Self {
track_id,
sender,
channel,
}
}
}
impl Drop for RealTimeStreamProcessor {
fn drop(&mut self) {
LISTENED_CALLS.pin().remove(&self.track_id);
}
}
impl Processor for RealTimeStreamProcessor {
fn process_frame(&mut self, frame: &mut AudioFrame) -> Result<()> {
if self.sender.receiver_count() == 0 {
return Ok(());
}
if let Samples::PCM { samples } = &frame.samples {
self.sender
.send(PcmFrame {
channel: self.channel,
samples: samples.to_vec(),
})
.ok();
}
Ok(())
}
}
fn listen_html(track_id: &str) -> Response {
let html = format!(
r#"<!DOCTYPE html>
<html>
<head><title>Listen: {track_id}</title></head>
<body>
<p>Track: <code>{track_id}</code></p>
<button onclick="start()">Start</button>
<script>
async function start() {{
const RATE = 16000;
const CHANNELS = 2;
const ctx = new AudioContext({{ sampleRate: RATE }});
await ctx.resume();
const resp = await fetch(window.location.href, {{
headers: {{ Accept: "audio/pcm" }}
}});
const reader = resp.body.getReader();
let nextTime = 0;
let leftover = new Uint8Array(0);
while (true) {{
const {{ done, value }} = await reader.read();
if (done) break;
// prepend any leftover bytes from previous chunk
let data;
if (leftover.length > 0) {{
data = new Uint8Array(leftover.length + value.length);
data.set(leftover);
data.set(value, leftover.length);
}} else {{
data = value;
}}
// each sample is 2 bytes (i16 LE), interleaved stereo
const bytesPerFrame = CHANNELS * 2;
const usable = data.length - (data.length % bytesPerFrame);
if (usable === 0) {{
leftover = data;
continue;
}}
leftover = data.slice(usable);
const i16s = new Int16Array(data.buffer, data.byteOffset, usable / 2);
const samplesPerChannel = i16s.length / CHANNELS;
const buf = ctx.createBuffer(CHANNELS, samplesPerChannel, RATE);
const left = buf.getChannelData(0);
const right = buf.getChannelData(1);
for (let i = 0; i < samplesPerChannel; i++) {{
left[i] = i16s[i * 2] / 32768;
right[i] = i16s[i * 2 + 1] / 32768;
}}
const src = ctx.createBufferSource();
src.buffer = buf;
src.connect(ctx.destination);
const now = ctx.currentTime;
if (nextTime < now) nextTime = now;
src.start(nextTime);
nextTime += buf.duration;
}}
}}
</script>
</body>
</html>"#
);
Response::builder()
.header("content-type", "text/html; charset=utf-8")
.body(Body::from(html))
.unwrap()
}
fn stream_pcm(mut rx: tokio::sync::broadcast::Receiver<PcmFrame>) -> Response {
let stream = async_stream::stream! {
let mut gz = GzEncoder::new(Vec::new(), Compression::fast());
let mut buf_self: Vec<i16> = Vec::new();
let mut buf_other: Vec<i16> = Vec::new();
loop {
match rx.recv().await {
Ok(frame) => {
match frame.channel {
ListenChannel::Left => buf_self.extend_from_slice(&frame.samples),
ListenChannel::Right => buf_other.extend_from_slice(&frame.samples),
}
while buf_self.len() >= FRAME_SAMPLES {
for i in 0..FRAME_SAMPLES {
gz.write_all(&buf_self[i].to_le_bytes()).unwrap();
let other = *buf_other.get(i).unwrap_or(&0);
gz.write_all(&other.to_le_bytes()).unwrap();
}
buf_self.drain(..FRAME_SAMPLES);
let drained = buf_other.len().min(FRAME_SAMPLES);
if drained > 0 {
buf_other.drain(..drained);
}
gz.flush().unwrap();
let compressed = std::mem::take(gz.get_mut());
if !compressed.is_empty() {
yield Ok::<Bytes, io::Error>(Bytes::from(compressed));
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
}
}
};
Response::builder()
.header("content-type", "application/octet-stream")
.header("content-encoding", "gzip")
.header("cache-control", "no-cache, no-store")
.body(Body::from_stream(stream))
.unwrap()
}
pub async fn stream_track(
Path(track_id): Path<String>,
headers: axum::http::HeaderMap,
) -> Response {
let accept = headers
.get("accept")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let rx = {
let guard = LISTENED_CALLS.pin();
let Some(sender) = guard.get(&track_id) else {
return (StatusCode::NOT_FOUND, "track not active").into_response();
};
if accept.contains("text/html") {
return listen_html(&track_id);
}
sender.0.subscribe()
};
stream_pcm(rx)
}
pub fn custom_create_procesors_hook(
engine: Arc<StreamEngine>,
track_id: TrackId,
cancel_token: CancellationToken,
event_sender: EventSender,
packet_sender: TrackPacketSender,
option: CallOption,
) -> Pin<Box<dyn Future<Output = Result<Vec<Box<dyn Processor>>>> + Send>> {
_custom_create_procesors_hook_inner(
engine,
track_id,
cancel_token,
event_sender,
packet_sender,
option,
)
.boxed()
}
async fn _custom_create_procesors_hook_inner(
engine: Arc<StreamEngine>,
track_id: TrackId,
cancel_token: CancellationToken,
event_sender: EventSender,
packet_sender: TrackPacketSender,
option: CallOption,
) -> Result<Vec<Box<dyn Processor>>> {
// server-side-track
let mut processors = StreamEngine::default_create_procesors_hook(
engine,
track_id.clone(),
cancel_token,
event_sender.clone(),
packet_sender,
option,
)
.await?;
let processor: Option<RealTimeStreamProcessor>;
if track_id == "server-side-track" {
// connect server-side-track to call by same event_sender channel
let guard = LISTENED_CALLS.pin();
if let Some(existing) = guard.values().find(|val| val.1.same_channel(&event_sender)) {
processor = Some(RealTimeStreamProcessor::from_sender(
track_id,
existing.0.clone(),
ListenChannel::Right,
))
} else {
processor = None;
log::error!("server-side-track created before main track?");
}
} else {
processor = Some(RealTimeStreamProcessor::new(
track_id,
event_sender,
ListenChannel::Left,
));
}
if let Some(processor) = processor {
processors.push(Box::new(processor));
}
Ok(processors)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment