Last active
December 21, 2020 02:50
-
-
Save imayhaveborkedit/ff4b073ca1703a3b9364c3a21399b39e to your computer and use it in GitHub Desktop.
Multicast AudioSource
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import threading | |
import queue | |
import discord | |
class SourceMulticaster(threading.Thread): | |
class ProxySource(discord.AudioSource): | |
def __init__(self, data_queue, cleanup_func, is_opus): | |
self.data_queue = data_queue | |
self.cleanup_func = cleanup_func | |
self._is_opus = is_opus | |
def read(self): | |
try: | |
data = self.data_queue.get(timeout=0.5) | |
except queue.Empty: | |
data = None | |
return data | |
def is_opus(self): | |
return self._is_opus | |
def cleanup(self): | |
self.cleanup_func(self) | |
def __init__(self, source): | |
super().__init__(target=self._run, daemon=True) | |
self.source = source | |
self._source_proxies = {} | |
self._end = threading.Event() | |
self._lock = threading.Lock() | |
def get(self, vc): | |
ps = self._source_proxies.get(vc) | |
if ps is None: | |
ps = self._make_proxysource(vc) | |
self._source_proxies[vc] = ps | |
return ps | |
def stop(self): | |
self._end.set() | |
self._do_cleanup() | |
def _do_cleanup(self): | |
with self._lock: | |
for vc in self._source_proxies: | |
vc.stop() | |
self._source_proxies.clear() | |
def _make_proxysource(self, vc): | |
data_queue = queue.Queue(2) | |
cleanup_func = self._make_cleanup(vc) | |
ps = self.ProxySource(data_queue, cleanup_func, self.source.is_opus()) | |
return ps | |
def _make_cleanup(self, vc): | |
def _generated(ps): | |
with self._lock: | |
self._source_proxies.pop(vc, None) | |
return _generated | |
def _run(self): | |
loops = 0 | |
start = time.perf_counter() | |
base_delay = 0.02 | |
while not self._end.is_set(): | |
loops += 1 | |
data = self.source.read() | |
if not data: | |
self.stop() | |
break | |
with self._lock: | |
for ps in self._source_proxies.values(): | |
try: | |
ps.data_queue.put_nowait(data) | |
except queue.Full: | |
... # ? | |
next_time = start + base_delay * loops | |
delay = max(0, base_delay + (next_time - time.perf_counter())) | |
time.sleep(delay) | |
async def example_usage(): | |
manager = SourceMulticaster(source) | |
manager.start() | |
vc1 = await channel.connect() | |
vc1.play(manager.get(vc1)) | |
vc2 = await another_channel.connect() | |
vc2.play(manager.get(vc2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment