Created
November 8, 2021 00:03
-
-
Save aidiary/767786370ba6ffc36e2a1577024ceaf6 to your computer and use it in GitHub Desktop.
リングバッファを用いたパススルー再生
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import sounddevice as sd | |
import soundfile as sf | |
from ring_buffer import InputRingBuffer | |
input_device = 'Yeti Stereo Microphone' | |
output_device = 'MacbookProのスピーカー' | |
fft_size = 1024 | |
hop_size = 256 | |
analysis_window = np.hanning(fft_size) | |
synthesis_window = np.hanning(fft_size) | |
# input curcular buffer | |
# ファイルやマイクからの入力サンプルを書き込むバッファ | |
buffer_size = 16384 | |
input_buffer = InputRingBuffer(buffer_size) | |
hop_counter = 0 | |
# output circular buffer | |
output_buffer = np.zeros(buffer_size) | |
output_buffer_write_pointer = hop_size | |
output_buffer_read_pointer = 0 | |
def process_fft(in_buffer, out_buffer, out_pointer): | |
unwrapped_buffer = np.zeros(fft_size) | |
for n in range(0, fft_size): | |
# in_pointerの位置から過去fft_sizeの範囲をfftする | |
# リアルタイム処理はcausalなので過去のサンプルしか使えない | |
# buffer_sizeはindexが負の値になったときにバッファの逆側から使うため | |
unwrapped_buffer[n] = in_buffer.get(in_buffer.head - fft_size + n) * analysis_window[n] | |
spectrum = np.fft.fft(unwrapped_buffer) | |
# IFFTで音声波形を復元 | |
reconstructed_wave = np.fft.ifft(spectrum) | |
# out_pointerの位置から書き出す | |
for n in range(0, fft_size): | |
circular_buffer_index = (out_pointer + n) % buffer_size | |
# ifftの結果は複素数なので実部のみ取り出す | |
# 上書きではなく前の結果と足し算するのがポイント(Overlap-Addのため) | |
# 音量が小さくなるので5倍する | |
out_buffer[circular_buffer_index] += reconstructed_wave[n].real * synthesis_window[n] * 5.0 | |
def callback(indata, outdata, frames, time, status): | |
global hop_counter, output_buffer_read_pointer, output_buffer_write_pointer | |
for n in range(0, frames): | |
# 入力処理 | |
# 入力音声をinput_bufferにためていく | |
# 1チャンネルの音声のみ利用 | |
input = indata[n, 0] | |
input_buffer.add(input) | |
# 出力処理 | |
# 処理済みのoutput_bufferの音声をスピーカーに出力 | |
# output_bufferから現在のサンプルでの出力を取得 | |
out = output_buffer[output_buffer_read_pointer] | |
# 読み込んだら次のoverlap-addで使うためにそなえてそこの場所はクリアしておく | |
# overlap-addは値を上書きをせずに足し算するだけなのでクリアしておかないといけない | |
output_buffer[output_buffer_read_pointer] = 0 | |
# 波形がオーバラップしてる部分はオーバラップの回数分平均して出力 | |
out *= hop_size / fft_size | |
output_buffer_read_pointer += 1 | |
# circular buffer | |
if output_buffer_read_pointer >= buffer_size: | |
output_buffer_read_pointer = 0 | |
# input_bufferにhop_sizeたまるたびにFFTを開始 | |
hop_counter += 1 | |
if hop_counter >= hop_size: | |
hop_counter = 0 | |
# input_buffer_pointerの位置から過去fft_size分のサンプルを使う | |
# output_buffer_write_pointerの位置からfft_size分だけ結果を書き込む | |
process_fft(input_buffer, output_buffer, output_buffer_write_pointer) | |
# Overlap Addするためにoutput_buffer_write_pointerはhop_size分だけすすめる | |
# output_buffer_read_pointerよりは常に先行している必要がある | |
output_buffer_write_pointer = (output_buffer_write_pointer + hop_size) % buffer_size | |
outdata[n, 0] = out | |
outdata[n, 1] = out | |
with sd.Stream(device=(input_device, output_device), | |
samplerate=44100, | |
channels=2, | |
blocksize=512, | |
latency=0, | |
callback=callback): | |
input() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
テストだよ。