Created
May 12, 2025 10:21
-
-
Save tin2tin/c10a8a4bc5f584979240786d9c2b2a7f to your computer and use it in GitHub Desktop.
Audio async render queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bl_info = { | |
"name": "AI Audio Generation Queue", | |
"author": "Your Name (and contributors to the original code)", | |
"version": (1, 0, 24), # No functional change for this issue, version kept | |
"blender": (3, 4, 0), | |
"location": "Video Sequence Editor > Sidebar (N-Panel) > AI Tools", | |
"description": "Adds audio generation jobs to a queue, processes them asynchronously, and adds results to the VSE.", | |
"warning": "Alpha version. Requires manual installation of AI model dependencies. Check console for errors.", | |
"doc_url": "https://your-documentation-link-here.com", | |
"category": "Sequencer", | |
} | |
import bpy | |
import threading | |
import time | |
import os | |
import uuid | |
from bpy.props import ( | |
StringProperty, | |
IntProperty, | |
FloatProperty, | |
BoolProperty, | |
EnumProperty, | |
CollectionProperty, | |
PointerProperty, | |
) | |
from bpy.types import Operator, Panel, PropertyGroup | |
import random | |
from fractions import Fraction | |
from math import gcd | |
try: | |
import torch | |
except ImportError: | |
# Use bl_info.get("name", fallback) for the addon name in messages | |
addon_display_name = bl_info.get("name", "AI Audio Addon") | |
print(f"{addon_display_name}: PyTorch (torch) library not found. GPU/MPS acceleration and some models may not be available.") | |
torch = None | |
AUDIO_JOB_LOCK = threading.Lock() | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE = [] | |
CURRENTLY_PROCESSING_JOB_ID = None | |
def get_addon_module_name(): | |
if __name__ == "__main__": | |
# For direct script execution, use the filename as the module name | |
return os.path.splitext(os.path.basename(__file__))[0] | |
# When run as an addon, __name__ is the module name (or package.module) | |
return __name__.split('.')[0] | |
def solve_path(path): return bpy.path.abspath(path) | |
def clean_filename(name): | |
name = "".join(c if c.isalnum() or c in " ._-" else "_" for c in name) | |
return name[:200] | |
def clear_cuda_cache(): | |
print("Attempting to clear CUDA cache...") | |
try: | |
if torch and torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
print("CUDA cache cleared.") | |
elif torch: # PyTorch is available, but CUDA isn't | |
print("CUDA not available, no GPU cache to clear via torch.cuda.") | |
else: # PyTorch itself is not available | |
print("PyTorch not available, cannot clear CUDA cache.") | |
except Exception as e: | |
print(f"Error in clear_cuda_cache: {e}") | |
def print_elapsed_time(start_time): print(f"Elapsed: {time.time() - start_time:.2f}s") | |
def low_vram(): return False | |
def find_first_empty_channel(start_frame, end_frame): | |
scene = bpy.context.scene | |
if not scene.sequence_editor: return 1 | |
max_channel = 0 | |
sequences = scene.sequence_editor.sequences_all | |
if not sequences: return 1 | |
for seq in sequences: | |
if seq.channel > max_channel: max_channel = seq.channel | |
for ch in range(1, max_channel + 3): | |
occupied = False | |
for seq in sequences: | |
if seq.channel == ch: | |
if max(start_frame, seq.frame_final_start) < min(end_frame, seq.frame_final_end): | |
occupied = True | |
break | |
if not occupied: return ch | |
return max_channel + 1 | |
# --- Audio Generation Task (Content identical to 1.0.24) --- | |
# ... (The entire _generate_audio_task function from version 1.0.24 should be here) | |
# ... (No changes needed in this function for the current AudioLDM2 error) | |
# --- PASTE _generate_audio_task from version 1.0.24 here --- | |
def _generate_audio_task(job_data): | |
global COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE | |
job_id = job_data['id'] | |
print(f"Starting audio generation for job: {job_id}, Prompt: {job_data['prompt'][:30]}...") | |
_torch_thread = None | |
try: | |
import torch as _torch_thread | |
except ImportError: | |
with AUDIO_JOB_LOCK: | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE.append({'id': job_id, 'status': 'ERROR', 'filepath': None, 'error': "PyTorch not found in generation thread."}) | |
return | |
import torchaudio | |
import scipy | |
from scipy.io.wavfile import write as write_wav | |
audio_model_card = job_data['audio_model_card'] | |
local_files_only = job_data['local_files_only'] | |
gfx_device = job_data['gfx_device'] | |
prompt_text = job_data['prompt'] | |
negative_prompt_text = job_data['negative_prompt'] | |
movie_num_inference_steps = job_data['inference_steps'] | |
movie_num_guidance = job_data['guidance_scale'] | |
audio_length_in_s = job_data['audio_length_s'] | |
seed = job_data['seed'] | |
parler_direction_prompt = job_data.get('parler_direction_prompt') | |
input_movie_path = job_data.get('input_movie_path') | |
input_image_path = job_data.get('input_image_path') | |
output_filepath = job_data['output_filepath'] | |
pipe = None | |
tokenizer = None | |
try: | |
if audio_model_card == "stabilityai/stable-audio-open-1.0": | |
from diffusers import StableAudioPipeline | |
repo_id = "stabilityai/stable-audio-open-1.0" | |
pipe = StableAudioPipeline.from_pretrained(repo_id, torch_dtype=_torch_thread.float16, local_files_only=local_files_only) | |
if low_vram(): pipe.enable_model_cpu_offload() | |
else: pipe.to(gfx_device) | |
elif audio_model_card == "WhisperSpeech": | |
from whisperspeech.pipeline import Pipeline | |
pipe = Pipeline(s2a_ref=job_data.get('whisper_model_ref', "collabora/whisperspeech:s2a-q4-small-en+pl.model")) | |
elif audio_model_card in ("parler-tts/parler-tts-large-v1", "parler-tts/parler-tts-mini-v1"): | |
from parler_tts import ParlerTTSForConditionalGeneration | |
from transformers import AutoTokenizer | |
print(f"Attempting to load Parler-TTS model: {audio_model_card} (standard loading)") | |
try: | |
pipe = ParlerTTSForConditionalGeneration.from_pretrained( | |
audio_model_card, | |
local_files_only=local_files_only | |
).to(gfx_device) | |
if gfx_device != "cpu": | |
if _torch_thread.cuda.is_available() and _torch_thread.cuda.is_bf16_supported() and 'large' in audio_model_card: | |
print("Parler-TTS: Attempting bfloat16.") | |
try: pipe = pipe.to(dtype=_torch_thread.bfloat16) | |
except Exception as e_bf16: print(f"Parler-TTS: bfloat16 conversion failed: {e_bf16}") | |
elif hasattr(pipe, "half"): | |
print("Parler-TTS: Attempting float16 (half precision).") | |
try: pipe = pipe.half() | |
except Exception as e_fp16: print(f"Parler-TTS: float16 conversion failed: {e_fp16}") | |
tokenizer = AutoTokenizer.from_pretrained( | |
audio_model_card, | |
local_files_only=local_files_only | |
) | |
print(f"Successfully loaded Parler-TTS model and tokenizer for {audio_model_card}") | |
except Exception as e_parler: | |
print(f"Error during Parler-TTS loading for {audio_model_card}: {e_parler}") | |
raise | |
elif audio_model_card == "bark": | |
import numpy as np | |
from bark.generation import generate_text_semantic, preload_models | |
from bark.api import semantic_to_waveform | |
from bark import SAMPLE_RATE | |
preload_models( text_use_small=True, coarse_use_small=True, fine_use_gpu=True, fine_use_small=True) | |
elif audio_model_card == "MMAudio": | |
import librosa | |
import mmaudio | |
from mmaudio.eval_utils import ModelConfig, all_model_cfg, generate, load_video, load_image, make_video, VideoInfo | |
from mmaudio.model.flow_matching import FlowMatching | |
from mmaudio.model.networks import MMAudio, get_my_mmaudio | |
from mmaudio.model.utils.features_utils import FeaturesUtils | |
_torch_thread.backends.cuda.matmul.allow_tf32 = True | |
_torch_thread.backends.cudnn.allow_tf32 = True | |
_device = _torch_thread.device(gfx_device) | |
_dtype = _torch_thread.bfloat16 if _device.type == 'cuda' else _torch_thread.float32 | |
_model_config_name = job_data.get('mmaudio_model_name', 'large_44k_v2') | |
_model_config: ModelConfig = all_model_cfg[_model_config_name] | |
_model_config.download_if_needed() | |
_scheduler_config = _model_config.seq_cfg | |
_model: MMAudio = get_my_mmaudio(_model_config.model_name).to(_device, _dtype).eval() | |
_model.load_weights(_torch_thread.load(_model_config.model_path, map_location=_device, weights_only=True)) | |
_feature_extractor = FeaturesUtils( | |
tod_vae_ckpt=_model_config.vae_path, | |
synchformer_ckpt=_model_config.synchformer_ckpt, | |
enable_conditions=True, | |
mode=_model_config.mode, | |
bigvgan_vocoder_ckpt=_model_config.bigvgan_16k_path, | |
need_vae_encoder=False | |
).to(_device, _dtype).eval() | |
pipe = {'model': _model, 'feature_extractor': _feature_extractor, 'scheduler_config': _scheduler_config, 'model_config': _model_config, 'device': _device, 'dtype': _dtype} | |
elif audio_model_card == "facebook/musicgen-stereo-melody-large": | |
from transformers import pipeline as hf_pipeline | |
from transformers import set_seed | |
pipe = hf_pipeline("text-to-audio", audio_model_card, device=gfx_device, torch_dtype=_torch_thread.float16, local_files_only=local_files_only) | |
elif audio_model_card == "cvssp/audioldm2-large": | |
from diffusers import AudioLDM2Pipeline | |
pipe = AudioLDM2Pipeline.from_pretrained(audio_model_card, torch_dtype=_torch_thread.float16, local_files_only=local_files_only) | |
if low_vram(): pipe.enable_model_cpu_offload() | |
else: pipe.to(gfx_device) | |
else: | |
raise ValueError(f"Audio model card not supported or not found: {audio_model_card}") | |
if _torch_thread.cuda.is_available() and gfx_device != "cpu": | |
generator = _torch_thread.Generator(gfx_device).manual_seed(seed) if seed != 0 else None | |
else: | |
generator = _torch_thread.Generator("cpu").manual_seed(seed) if seed != 0 else None | |
start_time_inference = time.time() | |
if audio_model_card == "stabilityai/stable-audio-open-1.0": | |
audio = pipe( | |
prompt=prompt_text, | |
negative_prompt=negative_prompt_text if negative_prompt_text else None, | |
num_inference_steps=movie_num_inference_steps, | |
guidance_scale=movie_num_guidance, | |
num_waveforms_per_prompt=1, | |
generator=generator, | |
).audios[0] | |
if audio.ndim == 1: audio = audio.unsqueeze(0) | |
output_np = audio.cpu().numpy() | |
sample_rate = pipe.config.sampling_rate | |
write_wav(output_filepath, sample_rate, output_np.T) | |
elif audio_model_card == "bark": | |
import numpy as np | |
from bark import SAMPLE_RATE | |
from bark.generation import generate_text_semantic | |
from bark.api import semantic_to_waveform | |
rate = SAMPLE_RATE | |
SPEAKER = job_data.get('bark_speaker_full_id', "v2/en_speaker_0") | |
sentences = [s.strip() for s in prompt_text.replace("\n", " ").split('.') if s.strip()] | |
if not sentences: sentences = [prompt_text] | |
pieces = [] | |
silence_duration_samples = int(0.25 * rate) | |
silence_np = np.zeros(silence_duration_samples, dtype=np.float32) | |
for sentence_chunk in sentences: | |
if not sentence_chunk: continue | |
semantic_tokens = generate_text_semantic( | |
sentence_chunk, history_prompt=SPEAKER, temp=0.7, min_eos_p=0.05 | |
) | |
audio_array = semantic_to_waveform(semantic_tokens, history_prompt=SPEAKER) | |
pieces.append(audio_array) | |
pieces.append(silence_np) | |
if pieces: | |
if pieces and np.array_equal(pieces[-1], silence_np): | |
pieces.pop() | |
if pieces: | |
audio_bark = np.concatenate(pieces) | |
write_wav(output_filepath, rate, audio_bark) | |
else: | |
raise RuntimeError("Bark generated no audio after processing sentences.") | |
else: | |
raise RuntimeError("Bark generated no audio pieces.") | |
elif audio_model_card == "WhisperSpeech": | |
speaker_path = job_data.get('whisper_speaker_path') | |
lang = job_data.get('whisper_lang', 'en') | |
cps = job_data.get('whisper_cps', 15) | |
audio_tensor = pipe.generate(prompt_text, speaker=speaker_path, lang=lang, cps=cps) | |
torchaudio.save(output_filepath, audio_tensor.cpu(), pipe.fs) | |
elif audio_model_card in ("parler-tts/parler-tts-large-v1", "parler-tts/parler-tts-mini-v1"): | |
input_ids = tokenizer(parler_direction_prompt, return_tensors="pt").input_ids.to(gfx_device) | |
prompt_input_ids = tokenizer(prompt_text, return_tensors="pt").input_ids.to(gfx_device) | |
generation = pipe.generate( | |
input_ids=input_ids, | |
prompt_input_ids=prompt_input_ids, | |
do_sample=True, | |
temperature=0.7, | |
).to(_torch_thread.float32) | |
audio_arr = generation.cpu().numpy().squeeze() | |
if audio_arr.ndim == 2 and audio_arr.shape[0] < audio_arr.shape[1]: | |
audio_arr = audio_arr.T | |
write_wav(output_filepath, pipe.config.sampling_rate, audio_arr) | |
elif audio_model_card == "facebook/musicgen-stereo-melody-large": | |
from transformers import set_seed | |
set_seed(seed) | |
max_tokens_for_duration = int(min(audio_length_in_s * 25, 1500)) | |
music = pipe( | |
prompt_text, | |
negative_prompt=negative_prompt_text if negative_prompt_text else None, | |
guidance_scale=movie_num_guidance, | |
forward_params={"max_new_tokens": max_tokens_for_duration} | |
) | |
audio_data = music["audio"][0].T | |
write_wav(output_filepath, music["sampling_rate"], audio_data.cpu().numpy()) | |
elif audio_model_card == "cvssp/audioldm2-large": | |
audio = pipe( | |
prompt_text, | |
negative_prompt=negative_prompt_text if negative_prompt_text else None, | |
num_inference_steps=movie_num_inference_steps, | |
audio_length_in_s=audio_length_in_s, | |
guidance_scale=movie_num_guidance, | |
generator=generator, | |
).audios[0] | |
audio_to_write = audio.cpu().numpy() | |
if audio_to_write.ndim == 2 and audio_to_write.shape[0] < audio_to_write.shape[1]: | |
audio_to_write = audio_to_write.T | |
sample_rate = getattr(pipe.scheduler.config, "sample_rate", | |
getattr(pipe.config, "sample_rate", 16000)) | |
write_wav(output_filepath, sample_rate, audio_to_write) | |
elif audio_model_card == "MMAudio": | |
_model = pipe['model'] | |
_feature_extractor = pipe['feature_extractor'] | |
_scheduler_config = pipe['scheduler_config'] | |
_model_config = pipe['model_config'] | |
_device = pipe['device'] | |
_dtype = pipe['dtype'] | |
_scheduler = FlowMatching(min_sigma=0, inference_mode='euler', num_steps=movie_num_inference_steps) | |
_video_data = None | |
_clip_frames_for_generate, _sync_frames_for_generate = None, None | |
_image_input_flag = False | |
mmaudio_sr = getattr(_model_config, 'sr', 44100) | |
_fps_num_from_job = job_data.get('fps_numerator', 24) | |
_fps_den_from_job = job_data.get('fps_denominator', 1.0) | |
if abs(_fps_den_from_job - 1.0) < 0.00001: | |
_fps_fraction_num_int = _fps_num_from_job | |
_fps_fraction_den_int = 1 | |
elif abs(_fps_den_from_job - 1.001) < 0.00001: | |
_fps_fraction_num_int = int(round(_fps_num_from_job * 1000)) | |
_fps_fraction_den_int = 1001 | |
else: | |
combined_frac = Fraction(_fps_num_from_job / _fps_den_from_job).limit_denominator(10000) | |
_fps_fraction_num_int = combined_frac.numerator | |
_fps_fraction_den_int = combined_frac.denominator | |
if _fps_fraction_den_int == 0: _fps_fraction_den_int = 1 | |
_fps_fraction = Fraction(_fps_fraction_num_int, _fps_fraction_den_int) | |
print(f"MMAudio using FPS fraction: {_fps_fraction}") | |
is_tta_case = not (input_movie_path and os.path.isfile(input_movie_path)) and \ | |
not (input_image_path and os.path.isfile(input_image_path)) | |
if not is_tta_case: | |
if input_movie_path and os.path.isfile(input_movie_path): | |
_video_data = load_video(input_movie_path, audio_length_in_s) | |
_clip_frames_for_generate = _video_data.clip_frames.unsqueeze(0).to(_device, _dtype) | |
_sync_frames_for_generate = _video_data.sync_frames.unsqueeze(0).to(_device, _dtype) | |
_scheduler_config.duration = _video_data.duration_sec | |
elif input_image_path and os.path.isfile(input_image_path): | |
_image_data = load_image(input_image_path) | |
_clip_frames_for_generate = _image_data.clip_frames.unsqueeze(0).to(_device, _dtype) | |
_sync_frames_for_generate = _image_data.sync_frames.unsqueeze(0).to(_device, _dtype) | |
_video_data = VideoInfo.from_image_info(_image_data, audio_length_in_s, fps=_fps_fraction) | |
_scheduler_config.duration = audio_length_in_s | |
_image_input_flag = True | |
else: | |
_scheduler_config.duration = audio_length_in_s | |
dummy_frame_shape = (1, 3, 224, 224) | |
_dummy_frame_tensor_batched = _torch_thread.zeros(dummy_frame_shape, device=_device, dtype=_dtype) | |
_squeezed_dummy_frame = _dummy_frame_tensor_batched.squeeze(0) | |
if not _video_data: | |
_video_data = VideoInfo(all_frames=_squeezed_dummy_frame, | |
clip_frames=_squeezed_dummy_frame, | |
sync_frames=_squeezed_dummy_frame, | |
duration_sec=audio_length_in_s, | |
fps=_fps_fraction) | |
_model.update_seq_lengths(_scheduler_config.latent_seq_len, _scheduler_config.clip_seq_len, _scheduler_config.sync_seq_len) | |
with _torch_thread.no_grad(): | |
keyword_args = { | |
"feature_utils": _feature_extractor, | |
"net": _model, | |
"fm": _scheduler, | |
"rng": generator, | |
"cfg_strength": movie_num_guidance, | |
"image_input": _image_input_flag | |
} | |
if is_tta_case: | |
if negative_prompt_text: | |
keyword_args["negative_prompt"] = [negative_prompt_text] | |
generated_audio = generate(None, None, [prompt_text], **keyword_args) | |
else: | |
keyword_args["text_prompt"] = [prompt_text] | |
if negative_prompt_text: | |
keyword_args["negative_text"] = [negative_prompt_text] | |
if _clip_frames_for_generate is not None: | |
keyword_args["video_frames"] = _clip_frames_for_generate | |
if _sync_frames_for_generate is not None: | |
keyword_args["sync_frames"] = _sync_frames_for_generate | |
generated_audio = generate(**keyword_args) | |
audio_output = generated_audio.float().cpu()[0] | |
target_sr = mmaudio_sr | |
print(f"MMAudio using target sample rate: {target_sr}") | |
if output_filepath.endswith(".mp4"): | |
make_video(_video_data, output_filepath, audio_output, sampling_rate=target_sr) | |
print(f"MMAudio Saved video to {output_filepath}") | |
else: | |
if audio_output.ndim == 1: audio_output = audio_output.unsqueeze(0) | |
torchaudio.save(output_filepath, audio_output, target_sr) | |
print(f"MMAudio Saved audio to {output_filepath}") | |
else: | |
raise NotImplementedError(f"Model {audio_model_card} inference not fully implemented in threaded task.") | |
print_elapsed_time(start_time_inference) | |
with AUDIO_JOB_LOCK: | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE.append({'id': job_id, 'status': 'COMPLETED', 'filepath': output_filepath, 'error': None}) | |
except ModuleNotFoundError as e: | |
error_message_detail = f"Module not found: {e.name}. Please install dependencies." | |
print(f"ERROR in job {job_id}: {error_message_detail}") | |
with AUDIO_JOB_LOCK: | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE.append({'id': job_id, 'status': 'ERROR', 'filepath': None, 'error': error_message_detail}) | |
except Exception as e: | |
print(f"ERROR in job {job_id}: {e}") | |
import traceback | |
error_message_detail = f"{type(e).__name__}: {str(e)}. Check console." | |
traceback.print_exc() | |
with AUDIO_JOB_LOCK: | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE.append({'id': job_id, 'status': 'ERROR', 'filepath': None, 'error': error_message_detail}) | |
finally: | |
del pipe | |
if tokenizer is not None: del tokenizer | |
if '_model' in locals() and audio_model_card == "MMAudio": | |
del _model, _feature_extractor, _scheduler_config, _model_config | |
print(f"Finished audio generation task for job: {job_id}") | |
# --- Property Group for Queue Items --- | |
class AudioQueueItem(PropertyGroup): | |
id: StringProperty(name="ID") | |
name: StringProperty(name="Job Name", default="Audio Job") | |
status: EnumProperty( | |
name="Status", | |
items=[ | |
("PENDING", "Pending", "Waiting to be processed"), | |
("PROCESSING", "Processing", "Currently generating audio"), | |
("COMPLETED", "Completed", "Audio generated successfully"), | |
("ERROR", "Error", "An error occurred during generation"), | |
], | |
default="PENDING", | |
) | |
prompt: StringProperty(name="Prompt", default="A beautiful melody", maxlen=1024) | |
negative_prompt: StringProperty(name="Negative Prompt", maxlen=1024) | |
inference_steps: IntProperty(name="Inference Steps", default=50, min=1, max=500) | |
guidance_scale: FloatProperty(name="Guidance Scale", default=7.0, min=0.0, max=50.0) | |
seed: IntProperty(name="Seed", default=0) | |
use_random_seed: BoolProperty(name="Use Random Seed", default=True) | |
audio_length_s: FloatProperty(name="Audio Length (s)", default=10.0, min=0.1, max=600.0) | |
output_filepath: StringProperty(name="Output Filepath", subtype='FILE_PATH', maxlen=1024) | |
start_frame: IntProperty(name="Start Frame", default=1, min=1) | |
channel: IntProperty(name="Channel", default=1, min=1) | |
audio_model_card: StringProperty(name="Audio Model Card", maxlen=256) | |
local_files_only: BoolProperty(name="Local Files Only") | |
gfx_device: StringProperty(name="Graphics Device", default="cpu", maxlen=32) | |
bark_speaker_full_id: StringProperty(name="Bark Speaker ID", default="v2/en_speaker_0", maxlen=64) | |
whisper_model_ref: StringProperty(name="WhisperSpeech Model Ref", default="collabora/whisperspeech:s2a-q4-small-en+pl.model", maxlen=256) | |
whisper_speaker_path: StringProperty(name="WhisperSpeech Speaker Path (.pt)", subtype='FILE_PATH', maxlen=1024) | |
whisper_lang: StringProperty(name="WhisperSpeech Language", default="en", maxlen=10) | |
whisper_cps: IntProperty(name="WhisperSpeech CPS", default=15, min=5, max=30) | |
parler_direction_prompt: StringProperty(name="Parler Direction Prompt", default="A clear, expressive voice speaking calmly.", maxlen=512) | |
mmaudio_model_name: StringProperty(name="MMAudio Model Name", default="large_44k_v2", maxlen=128) | |
input_movie_path: StringProperty(name="MMAudio Input Movie", subtype='FILE_PATH', maxlen=1024) | |
input_image_path: StringProperty(name="MMAudio Input Image", subtype='FILE_PATH', maxlen=1024) | |
fps_numerator: IntProperty(name="FPS Numerator", default=24) | |
fps_denominator: FloatProperty(name="FPS Denominator", default=1.0) | |
error_message: StringProperty(name="Error Message", default="", maxlen=1024) | |
# --- Scene Properties --- | |
def register_scene_properties(): | |
bpy.types.Scene.audio_gen_prompt = StringProperty(name="Prompt", default="A catchy synthwave tune", maxlen=1024) | |
bpy.types.Scene.audio_gen_negative_prompt = StringProperty(name="Negative Prompt", default="", maxlen=1024) | |
bpy.types.Scene.audio_gen_steps = IntProperty(name="Inference Steps", default=50, min=10, max=500) | |
bpy.types.Scene.audio_gen_guidance = FloatProperty(name="Guidance Scale", default=7.0, min=0.0, max=50.0) | |
bpy.types.Scene.audio_gen_seed = IntProperty(name="Seed", default=12345, min=0) | |
bpy.types.Scene.audio_gen_random_seed = BoolProperty(name="Use Random Seed", default=True) | |
bpy.types.Scene.audio_gen_length_s = FloatProperty(name="Duration (s)", default=10.0, min=1.0, max=600.0) | |
bpy.types.Scene.audio_gen_output_dir = StringProperty(name="Output Directory", subtype='DIR_PATH', default="//audio_gen/", maxlen=1024) | |
bpy.types.Scene.audio_gen_bark_speaker = StringProperty(name="Bark Speaker", default="v2/en_speaker_0", maxlen=64) | |
bpy.types.Scene.audio_gen_parler_description = StringProperty(name="Parler Description", default="A clear, expressive voice speaking calmly.", maxlen=512) | |
bpy.types.Scene.audio_gen_mmaudio_movie_input = StringProperty(name="MMAudio Movie Input", subtype='FILE_PATH', maxlen=1024) | |
bpy.types.Scene.audio_gen_mmaudio_image_input = StringProperty(name="MMAudio Image Input", subtype='FILE_PATH', maxlen=1024) | |
bpy.types.Scene.audio_gen_whisper_speaker_file = StringProperty(name="WhisperSpeech Speaker File (.pt)", subtype='FILE_PATH', maxlen=1024) | |
bpy.types.Scene.audio_render_queue = CollectionProperty(type=AudioQueueItem) | |
bpy.types.Scene.audio_render_queue_index = IntProperty() | |
def unregister_scene_properties(): | |
props_to_delete = [ | |
"audio_gen_prompt", "audio_gen_negative_prompt", "audio_gen_steps", | |
"audio_gen_guidance", "audio_gen_seed", "audio_gen_random_seed", | |
"audio_gen_length_s", "audio_gen_output_dir", "audio_gen_bark_speaker", | |
"audio_gen_parler_description", "audio_gen_mmaudio_movie_input", | |
"audio_gen_mmaudio_image_input", "audio_gen_whisper_speaker_file", | |
"audio_render_queue", "audio_render_queue_index" | |
] | |
for prop_name in props_to_delete: | |
if hasattr(bpy.types.Scene, prop_name): | |
delattr(bpy.types.Scene, prop_name) | |
# --- Operators --- | |
class SEQUENCER_OT_add_to_audio_queue(Operator): | |
bl_idname = "sequencer.add_to_audio_queue" | |
bl_label = "Add to Audio Queue" | |
bl_options = {"REGISTER", "UNDO"} | |
@classmethod | |
def poll(cls, context): return True | |
def execute(self, context): | |
scene = context.scene | |
addon_name = get_addon_module_name() | |
prefs = context.preferences.addons[addon_name].preferences | |
if not scene.sequence_editor: scene.sequence_editor_create() | |
item = scene.audio_render_queue.add() | |
item.id = str(uuid.uuid4()) | |
item.prompt = scene.audio_gen_prompt | |
item.name = item.prompt[:30] + "..." if len(item.prompt) > 30 else item.prompt | |
item.negative_prompt = scene.audio_gen_negative_prompt | |
item.inference_steps = scene.audio_gen_steps | |
item.guidance_scale = scene.audio_gen_guidance | |
item.use_random_seed = scene.audio_gen_random_seed | |
if item.use_random_seed: | |
item.seed = random.randint(0, 2**31 - 1) | |
else: | |
item.seed = scene.audio_gen_seed | |
item.audio_length_s = scene.audio_gen_length_s | |
item.start_frame = scene.frame_current | |
item.channel = find_first_empty_channel(item.start_frame, item.start_frame + int(item.audio_length_s * (scene.render.fps / scene.render.fps_base) +1)) | |
item.audio_model_card = prefs.audio_model_card | |
item.local_files_only = prefs.local_files_only | |
item.gfx_device = "cpu" | |
if torch: | |
try: | |
if prefs.execution_provider == "CUDAExecutionProvider": | |
if torch.cuda.is_available(): item.gfx_device = "cuda" | |
else: self.report({'WARNING'}, "CUDA selected, but not available. Using CPU.") | |
elif prefs.execution_provider == "CoreMLExecutionProvider": | |
if hasattr(torch, "backends") and hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): item.gfx_device = "mps" | |
else: self.report({'WARNING'}, "CoreML/MPS selected, but not available. Using CPU.") | |
except Exception as e: self.report({'WARNING'}, f"Error checking GPU devices: {e}. Using CPU.") | |
elif prefs.execution_provider != "CPUExecutionProvider": | |
self.report({'WARNING'}, "PyTorch not found. GPU execution unavailable. Using CPU.") | |
if item.audio_model_card == "bark": item.bark_speaker_full_id = scene.audio_gen_bark_speaker | |
elif item.audio_model_card == "WhisperSpeech": item.whisper_speaker_path = solve_path(scene.audio_gen_whisper_speaker_file) if scene.audio_gen_whisper_speaker_file else "" | |
elif item.audio_model_card in ("parler-tts/parler-tts-large-v1", "parler-tts/parler-tts-mini-v1"): item.parler_direction_prompt = scene.audio_gen_parler_description | |
elif item.audio_model_card == "MMAudio": | |
item.input_movie_path = solve_path(scene.audio_gen_mmaudio_movie_input) if scene.audio_gen_mmaudio_movie_input else "" | |
item.input_image_path = solve_path(scene.audio_gen_mmaudio_image_input) if scene.audio_gen_mmaudio_image_input else "" | |
item.fps_numerator = scene.render.fps | |
item.fps_denominator = scene.render.fps_base | |
output_dir = solve_path(scene.audio_gen_output_dir) | |
if not os.path.isdir(output_dir): | |
try: os.makedirs(output_dir, exist_ok=True) | |
except OSError as e: | |
self.report({'ERROR'}, f"Could not create output directory: {output_dir} - {e}") | |
scene.audio_render_queue.remove(len(scene.audio_render_queue)-1) | |
return {'CANCELLED'} | |
base_filename = clean_filename(f"{item.seed}_{item.prompt}") | |
file_extension = ".wav" | |
if item.audio_model_card == "MMAudio" and (item.input_movie_path or item.input_image_path): | |
file_extension = ".mp4" | |
item.output_filepath = os.path.join(output_dir, f"{base_filename}{file_extension}") | |
item.error_message = "" | |
self.report({"INFO"}, f"Added '{item.name}' to audio queue.") | |
return {"FINISHED"} | |
class SEQUENCER_OT_process_audio_queue(Operator): | |
bl_idname = "sequencer.process_audio_queue" | |
bl_label = "Process Queue" | |
bl_options = {"REGISTER"} | |
_timer = None | |
_previous_model_card = None | |
@classmethod | |
def poll(cls, context): | |
has_pending = any(job.status == "PENDING" for job in context.scene.audio_render_queue) | |
return has_pending and CURRENTLY_PROCESSING_JOB_ID is None | |
def _start_job_thread(self, job_item): | |
job_data = {k: getattr(job_item, k) for k in job_item.rna_type.properties.keys() if k != 'rna_type'} | |
job_data['output_filepath'] = solve_path(job_item.output_filepath) | |
if job_item.input_movie_path: job_data['input_movie_path'] = solve_path(job_item.input_movie_path) | |
if job_item.input_image_path: job_data['input_image_path'] = solve_path(job_item.input_image_path) | |
if job_item.whisper_speaker_path: job_data['whisper_speaker_path'] = solve_path(job_item.whisper_speaker_path) | |
thread = threading.Thread(target=_generate_audio_task, args=(job_data,)) | |
thread.daemon = True | |
thread.start() | |
print(f"Processing thread started for job: {job_item.name}") | |
def _start_next_pending_job(self, context): | |
global CURRENTLY_PROCESSING_JOB_ID | |
scene = context.scene | |
next_job_to_process = None | |
active_job_idx = -1 | |
for idx, job_in_q in enumerate(scene.audio_render_queue): | |
if job_in_q.status == "PENDING": | |
next_job_to_process = job_in_q | |
active_job_idx = idx | |
break | |
if next_job_to_process: | |
print(f"Preparing to start next pending job: {next_job_to_process.name}") | |
if SEQUENCER_OT_process_audio_queue._previous_model_card is not None and \ | |
SEQUENCER_OT_process_audio_queue._previous_model_card != next_job_to_process.audio_model_card: | |
print(f"Model changed from {SEQUENCER_OT_process_audio_queue._previous_model_card} to {next_job_to_process.audio_model_card}. Clearing VRAM.") | |
clear_cuda_cache() | |
SEQUENCER_OT_process_audio_queue._previous_model_card = next_job_to_process.audio_model_card | |
CURRENTLY_PROCESSING_JOB_ID = next_job_to_process.id | |
next_job_to_process.status = "PROCESSING" | |
next_job_to_process.error_message = "" | |
if active_job_idx != -1: | |
scene.audio_render_queue_index = active_job_idx | |
self._start_job_thread(next_job_to_process) | |
for area in context.screen.areas: | |
if area.type == 'SEQUENCE_EDITOR' or area.ui_type == 'PROPERTIES': | |
area.tag_redraw() | |
return True | |
return False | |
def execute(self, context): | |
if self._start_next_pending_job(context): | |
if SEQUENCER_OT_process_audio_queue._timer is None: | |
SEQUENCER_OT_process_audio_queue._timer = bpy.context.window_manager.event_timer_add(0.5, window=context.window) | |
context.window_manager.modal_handler_add(self) | |
current_job_name = "Unknown" | |
if 0 <= context.scene.audio_render_queue_index < len(context.scene.audio_render_queue): | |
current_job_name = context.scene.audio_render_queue[context.scene.audio_render_queue_index].name | |
self.report({"INFO"}, f"Started processing job: {current_job_name}") | |
return {"RUNNING_MODAL"} | |
else: | |
self.report({"INFO"}, "No pending jobs to process.") | |
SEQUENCER_OT_process_audio_queue._previous_model_card = None | |
return {"CANCELLED"} | |
def modal(self, context, event): | |
global COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE, CURRENTLY_PROCESSING_JOB_ID | |
scene = context.scene | |
if event.type == 'TIMER': | |
callbacks_to_process = [] | |
with AUDIO_JOB_LOCK: | |
if COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE: | |
callbacks_to_process.extend(COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE) | |
COMPLETED_AUDIO_JOBS_CALLBACK_QUEUE.clear() | |
job_just_finished_this_cycle = False | |
for result in callbacks_to_process: | |
job_id_completed = result['id'] | |
if CURRENTLY_PROCESSING_JOB_ID == job_id_completed: | |
CURRENTLY_PROCESSING_JOB_ID = None | |
job_just_finished_this_cycle = True | |
job_item = next((job for job in scene.audio_render_queue if job.id == job_id_completed), None) | |
if not job_item: | |
print(f"Job ID {job_id_completed} not found in queue after completion.") | |
continue | |
job_item.status = result['status'] | |
if result['status'] == 'COMPLETED': | |
job_item.output_filepath = solve_path(result['filepath']) | |
print(f"Job '{job_item.name}' completed. Adding strip.") | |
self._add_strip_to_vse(context, job_item) | |
else: | |
error_msg = result.get('error', 'Unknown error') | |
print(f"Job '{job_item.name}' failed: {error_msg}") | |
job_item.error_message = error_msg | |
for area in context.screen.areas: | |
if area.type == 'SEQUENCE_EDITOR' or area.ui_type == 'PROPERTIES': | |
area.tag_redraw() | |
if job_just_finished_this_cycle: | |
if not self._start_next_pending_job(context): | |
print("All jobs processed or queue processing stopped.") | |
if SEQUENCER_OT_process_audio_queue._timer: | |
bpy.context.window_manager.event_timer_remove(SEQUENCER_OT_process_audio_queue._timer) | |
SEQUENCER_OT_process_audio_queue._timer = None | |
SEQUENCER_OT_process_audio_queue._previous_model_card = None | |
return {'FINISHED'} | |
if event.type in {'RIGHTMOUSE', 'ESC'}: | |
if SEQUENCER_OT_process_audio_queue._timer: | |
bpy.context.window_manager.event_timer_remove(SEQUENCER_OT_process_audio_queue._timer) | |
SEQUENCER_OT_process_audio_queue._timer = None | |
if CURRENTLY_PROCESSING_JOB_ID: | |
bpy.ops.wm.report({'WARNING'}, message=f"Queue processing interrupted. Job {CURRENTLY_PROCESSING_JOB_ID} may finish in background.") | |
else: | |
bpy.ops.wm.report({'INFO'}, message="Queue processing stopped.") | |
SEQUENCER_OT_process_audio_queue._previous_model_card = None | |
return {'CANCELLED'} | |
return {'PASS_THROUGH'} | |
def _add_strip_to_vse(self, context, job_item): | |
scene = context.scene | |
seq_editor = scene.sequence_editor | |
if not seq_editor: seq_editor = scene.sequence_editor_create() | |
abs_filepath = solve_path(job_item.output_filepath) | |
if not os.path.exists(abs_filepath): | |
error_msg = f"Generated file not found: {abs_filepath}" | |
job_item.status = "ERROR" | |
job_item.error_message = error_msg | |
print(error_msg) | |
return | |
try: | |
if abs_filepath.lower().endswith(".mp4"): | |
new_strip = seq_editor.sequences.new_movie( | |
name=job_item.name, filepath=abs_filepath, | |
channel=job_item.channel, frame_start=job_item.start_frame, | |
) | |
else: | |
new_strip = seq_editor.sequences.new_sound( | |
name=job_item.name, filepath=abs_filepath, | |
channel=job_item.channel, frame_start=job_item.start_frame, | |
) | |
if new_strip: seq_editor.active_strip = new_strip | |
bpy.ops.wm.redraw_timer(type='DRAW_WIN_SWAP', iterations=1) | |
except Exception as e: | |
error_msg = f"Failed to add strip for {job_item.name}: {e}" | |
job_item.status = "ERROR" | |
job_item.error_message = error_msg | |
print(error_msg) | |
class SEQUENCER_OT_clear_audio_queue(Operator): | |
bl_idname = "sequencer.clear_audio_queue" | |
bl_label = "Clear Queue" | |
bl_options = {"REGISTER", "UNDO"} | |
@classmethod | |
def poll(cls, context): | |
return len(context.scene.audio_render_queue) > 0 and CURRENTLY_PROCESSING_JOB_ID is None | |
def execute(self, context): | |
if CURRENTLY_PROCESSING_JOB_ID is not None: | |
self.report({"WARNING"}, "Cannot clear queue while a job is processing.") | |
return {"CANCELLED"} | |
context.scene.audio_render_queue.clear() | |
context.scene.audio_render_queue_index = 0 | |
self.report({"INFO"}, "Audio queue cleared.") | |
return {"FINISHED"} | |
class SEQUENCER_OT_remove_from_audio_queue(Operator): | |
bl_idname = "sequencer.remove_from_audio_queue" | |
bl_label = "Remove Job" | |
bl_options = {"REGISTER", "UNDO"} | |
index: IntProperty(name="Job Index") | |
@classmethod | |
def poll(cls, context): | |
return len(context.scene.audio_render_queue) > 0 and CURRENTLY_PROCESSING_JOB_ID is None | |
def execute(self, context): | |
scene = context.scene | |
if not (0 <= self.index < len(scene.audio_render_queue)): | |
self.report({'WARNING'}, "Invalid job index.") | |
return {"CANCELLED"} | |
job_to_remove = scene.audio_render_queue[self.index] | |
if job_to_remove.status == "PROCESSING": | |
self.report({"WARNING"}, "Cannot remove a job marked as processing.") | |
return {"CANCELLED"} | |
scene.audio_render_queue.remove(self.index) | |
if scene.audio_render_queue_index >= self.index and scene.audio_render_queue_index > 0: | |
scene.audio_render_queue_index -=1 | |
elif not scene.audio_render_queue: | |
scene.audio_render_queue_index = 0 | |
self.report({"INFO"}, f"Job '{job_to_remove.name}' removed from queue.") | |
return {"FINISHED"} | |
# --- UI Panel --- | |
class SEQUENCER_PT_audio_generation_panel(Panel): | |
bl_label = "AI Audio Generation" | |
bl_space_type = "SEQUENCE_EDITOR" | |
bl_region_type = "UI" | |
bl_category = "AI Tools" | |
def draw(self, context): | |
layout = self.layout | |
scene = context.scene | |
addon_name = get_addon_module_name() | |
try: | |
prefs = context.preferences.addons[addon_name].preferences | |
except KeyError: | |
layout.label(text=f"Addon '{addon_name}' preferences not found. Ensure it's installed and enabled correctly.", icon='ERROR') | |
return | |
layout.label(text="Audio Generation Settings:") | |
col = layout.column(align=True) | |
col.prop(prefs, "audio_model_card", text="Model") | |
col.prop(scene, "audio_gen_prompt", text="Prompt") | |
col.prop(scene, "audio_gen_negative_prompt", text="Negative") | |
row = col.row(align=True); row.prop(scene, "audio_gen_steps", text="Steps"); row.prop(scene, "audio_gen_guidance", text="Guidance") | |
row = col.row(align=True); sub = row.row(align=True); sub.prop(scene, "audio_gen_seed", text="Seed"); sub.enabled = not scene.audio_gen_random_seed; row.prop(scene, "audio_gen_random_seed", text="", icon='QUESTION') | |
col.prop(scene, "audio_gen_length_s", text="Duration (s)") | |
col.prop(scene, "audio_gen_output_dir", text="Output Dir") | |
if prefs.audio_model_card == "bark": | |
box = layout.box(); box.label(text="Bark Settings:"); box.prop(scene, "audio_gen_bark_speaker", text="Speaker") | |
elif prefs.audio_model_card == "WhisperSpeech": | |
box = layout.box(); box.label(text="WhisperSpeech Settings:"); box.prop(scene, "audio_gen_whisper_speaker_file", text="Speaker .pt") | |
elif prefs.audio_model_card in ("parler-tts/parler-tts-large-v1", "parler-tts/parler-tts-mini-v1"): | |
box = layout.box(); box.label(text="Parler TTS Settings:"); box.prop(scene, "audio_gen_parler_description", text="Description") | |
elif prefs.audio_model_card == "MMAudio": | |
box = layout.box(); box.label(text="MMAudio Settings:"); box.prop(scene, "audio_gen_mmaudio_movie_input", text="Movie Input"); box.prop(scene, "audio_gen_mmaudio_image_input", text="Image Input") | |
layout.separator() | |
layout.operator(SEQUENCER_OT_add_to_audio_queue.bl_idname, icon="ADD") | |
layout.separator() | |
layout.label(text="Audio Render Queue:") | |
row = layout.row(align=True) | |
process_op_text = "Process Queue" | |
if CURRENTLY_PROCESSING_JOB_ID is not None or \ | |
(SEQUENCER_OT_process_audio_queue._timer is not None and any(j.status == "PENDING" for j in scene.audio_render_queue)): | |
process_op_text = "Processing..." | |
row.operator(SEQUENCER_OT_process_audio_queue.bl_idname, icon="PLAY", text=process_op_text) | |
row.operator(SEQUENCER_OT_clear_audio_queue.bl_idname, icon="TRASH", text="Clear All") | |
if CURRENTLY_PROCESSING_JOB_ID: | |
processing_job_item = next((job for job in scene.audio_render_queue if job.id == CURRENTLY_PROCESSING_JOB_ID), None) | |
if processing_job_item: | |
layout.label(text=f"Active: {processing_job_item.name[:25]}...", icon='INFO') | |
if not scene.audio_render_queue: layout.label(text="Queue is empty.") | |
else: | |
for i, item in enumerate(scene.audio_render_queue): | |
box = layout.box() | |
main_row = box.row() | |
icon = 'FILE_SOUND' | |
if item.status == "PROCESSING": icon = 'PLAY' | |
elif item.status == "COMPLETED": icon = 'CHECKMARK' | |
elif item.status == "ERROR": icon = 'ERROR' | |
elif item.status == "PENDING": icon = 'TIME' | |
main_row.label(text=f"{i+1}. {item.name}", icon=icon) | |
main_row.label(text=f"({item.status})") | |
op_row = main_row.row(align=True) | |
op_rem_props = op_row.operator(SEQUENCER_OT_remove_from_audio_queue.bl_idname, text="", icon="X") | |
if op_rem_props: | |
op_rem_props.index = i | |
if item.status == "COMPLETED": | |
box.prop(item, "output_filepath", text="File", emboss=False, icon='FILEBROWSER') | |
elif item.status == "ERROR": | |
if item.error_message: | |
box.label(text=item.error_message, icon='CANCEL') | |
else: | |
box.label(text="Failed. Check console for details.", icon='CANCEL') | |
# --- Addon Preferences --- | |
class MyAddonPreferences(bpy.types.AddonPreferences): | |
bl_idname = get_addon_module_name() | |
model_items = [ | |
("stabilityai/stable-audio-open-1.0", "Stable Audio Open 1.0", "Stability AI text-to-audio"), | |
("bark", "Bark", "Suno AI text-to-speech"), | |
("WhisperSpeech", "WhisperSpeech", "Collabora text-to-speech"), | |
("parler-tts/parler-tts-large-v1", "Parler TTS Large", "Parler TTS Large model"), | |
("facebook/musicgen-stereo-melody-large", "MusicGen Stereo Large", "Facebook AI music generation"), | |
("cvssp/audioldm2-large", "AudioLDM2 Large", "Audio generation model"), | |
("MMAudio", "MMAudio", "Multimodal Audio generation"), | |
] | |
audio_model_card: EnumProperty(name="Audio Model", items=model_items, default="stabilityai/stable-audio-open-1.0") | |
local_files_only: BoolProperty(name="Use Local Files Only (Hugging Face)", default=False) | |
exec_provider_items = [ | |
("CPUExecutionProvider", "CPU", "Use CPU"), | |
("CUDAExecutionProvider", "CUDA (NVIDIA)", "Use NVIDIA GPU (requires PyTorch+CUDA)"), | |
("CoreMLExecutionProvider", "CoreML (Apple MPS)", "Use Apple Silicon GPU (requires PyTorch+MPS)"), | |
] | |
execution_provider: EnumProperty(name="Execution Provider", items=exec_provider_items, default="CPUExecutionProvider") | |
def draw(self, context): | |
layout = self.layout | |
layout.prop(self, "audio_model_card") | |
layout.prop(self, "local_files_only") | |
layout.prop(self, "execution_provider") | |
layout.label(text="Note: Model dependencies (PyTorch, diffusers, etc.)") | |
layout.label(text="must be installed into Blender's Python environment.") | |
if not torch: layout.label(text="PyTorch (torch) library not found!", icon='ERROR') | |
# --- Registration --- | |
prop_group_classes = (AudioQueueItem,) | |
other_classes = ( | |
MyAddonPreferences, | |
SEQUENCER_OT_add_to_audio_queue, | |
SEQUENCER_OT_process_audio_queue, | |
SEQUENCER_OT_clear_audio_queue, | |
SEQUENCER_OT_remove_from_audio_queue, | |
SEQUENCER_PT_audio_generation_panel, | |
) | |
def register(): | |
for cls in prop_group_classes: bpy.utils.register_class(cls) | |
register_scene_properties() | |
for cls in other_classes: bpy.utils.register_class(cls) | |
SEQUENCER_OT_process_audio_queue._previous_model_card = None | |
def unregister(): | |
for cls in reversed(other_classes): bpy.utils.unregister_class(cls) | |
unregister_scene_properties() | |
for cls in reversed(prop_group_classes): bpy.utils.unregister_class(cls) | |
if SEQUENCER_OT_process_audio_queue._timer: | |
try: | |
if bpy.context.window_manager and bpy.context.window: | |
bpy.context.window_manager.event_timer_remove(SEQUENCER_OT_process_audio_queue._timer) | |
except (AttributeError, RuntimeError) as e: print(f"Could not remove timer during unregister: {e}") | |
SEQUENCER_OT_process_audio_queue._timer = None | |
SEQUENCER_OT_process_audio_queue._previous_model_card = None | |
if __name__ == "__main__": | |
try: unregister() | |
except Exception as e: print(f"Error during pre-unregister: {e}") | |
register() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment