Skip to content

Instantly share code, notes, and snippets.

@Goblinlordx
Created April 13, 2025 21:37
Show Gist options
  • Select an option

  • Save Goblinlordx/6ce2faa8fa8ad67d9fd9d524e712de59 to your computer and use it in GitHub Desktop.

Select an option

Save Goblinlordx/6ce2faa8fa8ad67d9fd9d524e712de59 to your computer and use it in GitHub Desktop.
RTP relay
package trackrelay
import (
"context"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
type Relay struct {
track *webrtc.TrackRemote
buffer *RingBuffer[*IdentifiablePacket]
close chan struct{}
}
type IdentifiablePacket struct {
*rtp.Packet
}
func (p IdentifiablePacket) ID() uint16 {
return p.SequenceNumber
}
func NewRelay(tr *webrtc.TrackRemote) (*Relay, error) {
buffer := NewRingBuffer[*IdentifiablePacket](500)
close := make(chan struct{})
go func() {
rtpBuffer := make([]byte, 1500)
for {
_, _, err := tr.Read(rtpBuffer)
if err != nil {
return
}
packet := &rtp.Packet{}
err = packet.Unmarshal(rtpBuffer)
if err != nil {
return
}
buffer.Push(&IdentifiablePacket{Packet: packet})
select {
case <-close:
return
default:
continue
}
}
}()
return &Relay{
track: tr,
buffer: buffer,
close: close,
}, nil
}
func (r *Relay) Subscribe(ctx context.Context, pc *webrtc.PeerConnection) {
// New TrackLocal for subsriber
tl, err := webrtc.NewTrackLocalStaticRTP(
r.track.Codec().RTPCodecCapability,
"video",
r.track.StreamID(),
)
if err != nil {
return
}
rtpSender, err := pc.AddTrack(tl)
rtpSub := r.buffer.Subscribe(ctx, 10)
go func() {
rtcpBuf := make([]byte, 1500)
for {
select {
case <-r.close:
return
default:
}
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {
return
}
packets, err := rtcp.Unmarshal(rtcpBuf)
if err != nil {
continue
}
for _, packet := range packets {
switch rtcpPacket := packet.(type) {
// Handle subscriber NACKs
case *rtcp.TransportLayerNack:
for _, nack := range rtcpPacket.Nacks {
if pkt, ok := r.buffer.GetByID(nack.PacketID); ok {
if err := tl.WriteRTP(pkt.Packet); err != nil {
return
}
}
}
case *rtcp.PictureLossIndication:
}
}
}
}()
go func() {
for packet := range rtpSub {
tl.WriteRTP(packet.Packet)
}
pc.RemoveTrack(rtpSender)
}()
}
package trackrelay
import (
"container/ring"
"context"
"sync"
)
type Identifiable interface {
ID() uint16
}
type Subscriber[T Identifiable] chan T
type RingBuffer[T Identifiable] struct {
mu sync.RWMutex
ring *ring.Ring
capacity int
index map[uint16]*ring.Ring
indexMu sync.RWMutex
subs map[Subscriber[T]]struct{}
subsMu sync.RWMutex
}
func NewRingBuffer[T Identifiable](capacity int) *RingBuffer[T] {
return &RingBuffer[T]{
ring: ring.New(capacity),
capacity: capacity,
index: make(map[uint16]*ring.Ring),
subs: make(map[Subscriber[T]]struct{}),
}
}
func (rb *RingBuffer[T]) Push(item T) {
rb.mu.Lock()
defer rb.mu.Unlock()
id := item.ID()
// Remove old item from index if overwriting
if rb.ring.Value != nil {
oldID := rb.ring.Value.(T).ID()
rb.indexMu.Lock()
delete(rb.index, oldID)
rb.indexMu.Unlock()
}
// Update ring
rb.ring.Value = item
current := rb.ring
rb.ring = rb.ring.Next()
// Update index
rb.indexMu.Lock()
rb.index[id] = current
rb.indexMu.Unlock()
// Notify subscribers
rb.notifySubscribers(item)
}
func (rb *RingBuffer[T]) GetByID(id uint16) (T, bool) {
rb.indexMu.RLock()
defer rb.indexMu.RUnlock()
node, exists := rb.index[id]
if !exists || node.Value == nil {
var zero T
return zero, false
}
return node.Value.(T), true
}
func (rb *RingBuffer[T]) Items() []T {
rb.mu.RLock()
defer rb.mu.RUnlock()
items := make([]T, 0, rb.capacity)
rb.ring.Do(func(p interface{}) {
if p != nil {
items = append(items, p.(T))
}
})
return items
}
func (rb *RingBuffer[T]) Subscribe(ctx context.Context, bufferSize int) Subscriber[T] {
sub := make(Subscriber[T], bufferSize)
rb.subsMu.Lock()
rb.subs[sub] = struct{}{}
rb.subsMu.Unlock()
// Cleanup when context is done
go func() {
<-ctx.Done()
rb.unsubscribe(sub)
}()
// Send existing items
rb.sendExistingItems(sub, ctx)
return sub
}
func (rb *RingBuffer[T]) Unsubscribe(sub Subscriber[T]) {
rb.unsubscribe(sub)
}
func (rb *RingBuffer[T]) notifySubscribers(item T) {
rb.subsMu.RLock()
defer rb.subsMu.RUnlock()
for sub := range rb.subs {
select {
case sub <- item:
default:
// Skip if subscriber can't keep up
}
}
}
func (rb *RingBuffer[T]) unsubscribe(sub Subscriber[T]) {
rb.subsMu.Lock()
defer rb.subsMu.Unlock()
if _, exists := rb.subs[sub]; exists {
delete(rb.subs, sub)
close(sub)
}
}
func (rb *RingBuffer[T]) sendExistingItems(sub Subscriber[T], ctx context.Context) {
rb.mu.RLock()
items := make([]T, 0, rb.capacity)
rb.ring.Do(func(p interface{}) {
if p != nil {
items = append(items, p.(T))
}
})
rb.mu.RUnlock()
go func() {
for _, item := range items {
select {
case sub <- item:
case <-ctx.Done():
return
}
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment