A comprehensive system for capturing NDI streams, recording them, and processing with MoviePy for professional video production workflows.
# Install required packages
!pip install ndi-python opencv-python ipywidgets moviepy numpy pillow
# Note: Ensure NDI SDK/Runtime is installed on your system
import threading
import time
import cv2
import numpy as np
import ndi
import ipywidgets as widgets
from IPython.display import display, clear_output
import queue
from contextlib import contextmanager
import os
from datetime import datetime, timedelta
import tempfile
# MoviePy imports
from moviepy.editor import (
VideoFileClip, ImageSequenceClip, concatenate_videoclips,
vfx, afx, CompositeVideoClip, TextClip, ColorClip
)
from moviepy.video.fx import (
fadein, fadeout, resize, rotate, colorx,
lum_contrast, blur, mirror_x, mirror_y
)
# Enhanced state management for recording
class NDIMoviePyState:
def __init__(self):
self.frame_queue = queue.Queue(maxsize=10)
self.recording_queue = queue.Queue()
self.stop_event = threading.Event()
self.recording_event = threading.Event()
# Recording state
self.is_recording = False
self.recorded_frames = []
self.recording_fps = 30
self.recording_start_time = None
# Threads
self.receiver_thread = None
self.display_thread = None
self.recording_thread = None
# File management
self.temp_dir = tempfile.mkdtemp(prefix="ndi_recording_")
self.recording_segments = []
def start_recording(self):
self.is_recording = True
self.recording_event.set()
self.recording_start_time = time.time()
self.recorded_frames = []
def stop_recording(self):
self.is_recording = False
self.recording_event.clear()
def stop_all(self):
self.stop_event.set()
self.stop_recording()
# Join threads
for thread in [self.receiver_thread, self.display_thread, self.recording_thread]:
if thread and thread.is_alive():
thread.join(timeout=2)
def cleanup_temp_files(self):
"""Clean up temporary recording files"""
import shutil
try:
shutil.rmtree(self.temp_dir)
except:
pass
moviepy_state = NDIMoviePyState()
def ndi_receiver_with_recording(state):
"""Enhanced NDI receiver that captures frames for both display and recording"""
try:
if not ndi.initialize():
print("Failed to initialize NDI")
return
print("NDI initialized successfully")
# Find and connect to NDI source
sources = ndi.find_sources(timeout=5000)
if not sources:
print("No NDI sources found")
return
print(f"Found {len(sources)} NDI sources:")
for i, source in enumerate(sources):
print(f" {i}: {source.name}")
receiver = ndi.Receiver()
receiver.connect(sources[0])
print(f"Connected to: {sources[0].name}")
frame_count = 0
last_recording_time = 0
recording_frame_interval = 1.0 / state.recording_fps
while not state.stop_event.is_set():
try:
frame = receiver.read(timeout=1000)
if frame is not None:
frame_count += 1
current_time = time.time()
# Convert frame format
if frame.shape[2] == 4: # RGBA
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
else:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Add to display queue
try:
state.frame_queue.put_nowait({
'frame': rgb_frame,
'timestamp': current_time,
'frame_number': frame_count
})
except queue.Full:
try:
state.frame_queue.get_nowait()
state.frame_queue.put_nowait({
'frame': rgb_frame,
'timestamp': current_time,
'frame_number': frame_count
})
except queue.Empty:
pass
# Add to recording queue if recording and enough time has passed
if (state.is_recording and
current_time - last_recording_time >= recording_frame_interval):
try:
state.recording_queue.put_nowait({
'frame': rgb_frame.copy(),
'timestamp': current_time,
'frame_number': frame_count
})
last_recording_time = current_time
except queue.Full:
print("Recording queue full - dropping frame")
except Exception as e:
print(f"Frame capture error: {e}")
time.sleep(0.1)
except Exception as e:
print(f"NDI receiver error: {e}")
finally:
try:
ndi.destroy()
except:
pass
print("NDI receiver stopped")
def recording_processor(state):
"""Process recorded frames and save them"""
segment_frames = []
segment_start_time = None
while not state.stop_event.is_set():
try:
# Wait for recording to start
if not state.recording_event.wait(timeout=1):
continue
# Get frame from recording queue
try:
frame_data = state.recording_queue.get(timeout=0.1)
if segment_start_time is None:
segment_start_time = frame_data['timestamp']
segment_frames.append(frame_data['frame'])
except queue.Empty:
# Check if recording stopped and we have frames to save
if not state.is_recording and segment_frames:
# Save segment
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
segment_path = os.path.join(state.temp_dir, f"segment_{timestamp}.mp4")
print(f"Saving recording segment with {len(segment_frames)} frames...")
# Create video from frames
if segment_frames:
clip = ImageSequenceClip(segment_frames, fps=state.recording_fps)
clip.write_videofile(segment_path, codec='libx264', audio=False, verbose=False, logger=None)
state.recording_segments.append({
'path': segment_path,
'frames': len(segment_frames),
'duration': len(segment_frames) / state.recording_fps,
'timestamp': timestamp
})
print(f"Segment saved: {segment_path}")
# Reset for next segment
segment_frames = []
segment_start_time = None
continue
except Exception as e:
print(f"Recording processor error: {e}")
time.sleep(0.1)
# Create enhanced widgets
image_widget = widgets.Image(format='jpeg', width=640, height=360)
# Status and info
info_label = widgets.HTML(value="<b>Status:</b> Initializing...")
recording_info = widgets.HTML(value="<b>Recording:</b> Not recording")
# Control buttons
start_button = widgets.Button(description="Start Stream", button_style='success', icon='play')
stop_button = widgets.Button(description="Stop Stream", button_style='danger', icon='stop')
record_button = widgets.Button(description="Start Recording", button_style='warning', icon='circle')
stop_record_button = widgets.Button(description="Stop Recording", button_style='info', icon='square')
snapshot_button = widgets.Button(description="Snapshot", button_style='primary', icon='camera')
# Recording settings
fps_slider = widgets.IntSlider(value=30, min=10, max=60, description='Recording FPS:')
quality_dropdown = widgets.Dropdown(
options=['High', 'Medium', 'Low'],
value='High',
description='Quality:'
)
# MoviePy processing controls
effect_dropdown = widgets.Dropdown(
options=[
'None', 'Fade In/Out', 'Color Enhance', 'Blur Effect',
'Mirror X', 'Mirror Y', 'Rotate 90ยฐ', 'Black & White'
],
value='None',
description='Live Effect:'
)
# Layout
controls_row1 = widgets.HBox([start_button, stop_button, record_button, stop_record_button])
controls_row2 = widgets.HBox([snapshot_button, fps_slider, quality_dropdown])
effects_row = widgets.HBox([effect_dropdown])
display_box = widgets.VBox([
info_label,
recording_info,
image_widget,
controls_row1,
controls_row2,
effects_row
])
display(display_box)
# Global variables
current_frame = None
frame_stats = {'count': 0, 'fps': 0, 'last_time': time.time()}
recording_stats = {'frames': 0, 'duration': 0}
def apply_moviepy_effect(frame, effect_name):
"""Apply MoviePy-style effects to individual frames"""
try:
if effect_name == 'None':
return frame
# Convert numpy array to PIL for some effects, then back
height, width = frame.shape[:2]
if effect_name == 'Fade In/Out':
# Simple fade effect by adjusting brightness
fade_factor = 0.7 + 0.3 * np.sin(time.time() * 2) # Oscillating fade
return (frame * fade_factor).astype(np.uint8)
elif effect_name == 'Color Enhance':
# Enhance colors (similar to moviepy colorx)
enhanced = cv2.convertScaleAbs(frame, alpha=1.3, beta=10)
return enhanced
elif effect_name == 'Blur Effect':
# Gaussian blur
return cv2.GaussianBlur(frame, (15, 15), 0)
elif effect_name == 'Mirror X':
# Horizontal flip
return cv2.flip(frame, 1)
elif effect_name == 'Mirror Y':
# Vertical flip
return cv2.flip(frame, 0)
elif effect_name == 'Rotate 90ยฐ':
# Rotate 90 degrees clockwise
return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
elif effect_name == 'Black & White':
# Convert to grayscale then back to RGB
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
return cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)
return frame
except Exception as e:
print(f"Effect processing error: {e}")
return frame
def display_update_loop_with_effects(state):
"""Enhanced display loop with real-time effects"""
global current_frame, frame_stats
while not state.stop_event.is_set():
try:
frame_data = state.frame_queue.get(timeout=0.1)
raw_frame = frame_data['frame']
# Apply selected effect
current_effect = effect_dropdown.value
processed_frame = apply_moviepy_effect(raw_frame, current_effect)
current_frame = processed_frame
# Resize for display
display_frame = cv2.resize(processed_frame, (640, 360))
# Convert to JPEG for widget
_, buffer = cv2.imencode('.jpg', display_frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
image_widget.value = buffer.tobytes()
# Update stats
frame_stats['count'] += 1
current_time = time.time()
time_diff = current_time - frame_stats['last_time']
if time_diff >= 1.0:
frame_stats['fps'] = frame_stats['count'] / time_diff
frame_stats['count'] = 0
frame_stats['last_time'] = current_time
# Update info displays
info_label.value = f"""
<b>Status:</b> Streaming | <b>FPS:</b> {frame_stats['fps']:.1f} |
<b>Frame:</b> {frame_data['frame_number']} | <b>Effect:</b> {current_effect} |
<b>Resolution:</b> {raw_frame.shape[1]}x{raw_frame.shape[0]}
"""
# Update recording info
if state.is_recording:
recording_duration = time.time() - state.recording_start_time
recording_info.value = f"""
<b style="color: red;">โ RECORDING</b> |
<b>Duration:</b> {recording_duration:.1f}s |
<b>Segments:</b> {len(state.recording_segments)}
"""
else:
recording_info.value = f"""
<b>Recording:</b> Stopped |
<b>Segments:</b> {len(state.recording_segments)}
"""
except queue.Empty:
continue
except Exception as e:
print(f"Display update error: {e}")
time.sleep(0.1)
def start_streaming(_):
"""Start NDI streaming with recording capability"""
global moviepy_state
if moviepy_state.receiver_thread and moviepy_state.receiver_thread.is_alive():
print("Stream already running!")
return
# Reset state
moviepy_state = NDIMoviePyState()
# Update recording FPS from slider
moviepy_state.recording_fps = fps_slider.value
# Start threads
moviepy_state.receiver_thread = threading.Thread(
target=ndi_receiver_with_recording,
args=(moviepy_state,),
daemon=True
)
moviepy_state.display_thread = threading.Thread(
target=display_update_loop_with_effects,
args=(moviepy_state,),
daemon=True
)
moviepy_state.recording_thread = threading.Thread(
target=recording_processor,
args=(moviepy_state,),
daemon=True
)
# Start all threads
moviepy_state.receiver_thread.start()
moviepy_state.display_thread.start()
moviepy_state.recording_thread.start()
info_label.value = "<b>Status:</b> Starting stream..."
print("NDI stream with recording capability started")
def stop_streaming(_):
"""Stop NDI streaming"""
global moviepy_state
moviepy_state.stop_all()
info_label.value = "<b>Status:</b> Stopped"
recording_info.value = "<b>Recording:</b> Stopped"
print("NDI stream stopped")
def start_recording(_):
"""Start recording NDI stream"""
global moviepy_state
if not moviepy_state.receiver_thread or not moviepy_state.receiver_thread.is_alive():
print("Start streaming first!")
return
moviepy_state.start_recording()
record_button.description = "Recording..."
record_button.button_style = 'danger'
print("Recording started")
def stop_recording(_):
"""Stop recording NDI stream"""
global moviepy_state
moviepy_state.stop_recording()
record_button.description = "Start Recording"
record_button.button_style = 'warning'
print("Recording stopped")
def save_snapshot(_):
"""Save current frame with effects applied"""
global current_frame
if current_frame is not None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
effect_name = effect_dropdown.value.replace(' ', '_').replace('/', '_')
filename = f"ndi_snapshot_{effect_name}_{timestamp}.jpg"
cv2.imwrite(filename, cv2.cvtColor(current_frame, cv2.COLOR_RGB2BGR))
print(f"Snapshot saved: {filename}")
else:
print("No frame available to save")
# Connect button events
start_button.on_click(start_streaming)
stop_button.on_click(stop_streaming)
record_button.on_click(start_recording)
stop_record_button.on_click(stop_recording)
snapshot_button.on_click(save_snapshot)
def create_professional_video_sequence():
"""Create a professional video sequence from recorded segments using MoviePy"""
global moviepy_state
if not moviepy_state.recording_segments:
print("No recorded segments available!")
return None
print(f"Processing {len(moviepy_state.recording_segments)} recorded segments...")
try:
# Load all segments as VideoFileClips
clips = []
for i, segment in enumerate(moviepy_state.recording_segments):
print(f"Loading segment {i+1}: {segment['path']}")
# Load the clip
clip = VideoFileClip(segment['path'])
# Apply different effects to each segment (like your example)
if i == 0:
# First clip: fade in
processed_clip = clip.fx(fadein, 1.0)
elif i == 1:
# Second clip: color enhancement
processed_clip = clip.fx(colorx, 1.5)
elif i == 2:
# Third clip: brightness/contrast adjustment
processed_clip = clip.fx(lum_contrast, 0, 50, 128)
else:
# Additional clips: various effects
effects = [
lambda c: c.fx(blur, 1.5),
lambda c: c.fx(mirror_x),
lambda c: c.fx(rotate, 10),
lambda c: c.fx(colorx, 0.5) # Desaturate
]
effect_func = effects[i % len(effects)]
processed_clip = effect_func(clip)
# Add fade out to last clip
if i == len(moviepy_state.recording_segments) - 1:
processed_clip = processed_clip.fx(fadeout, 1.0)
clips.append(processed_clip)
# Concatenate all clips
print("Concatenating clips...")
final_video = concatenate_videoclips(clips, method="compose")
# Add a title overlay
title_text = TextClip("NDI Live Recording",
fontsize=50,
color='white',
font='Arial-Bold')
title_text = title_text.set_position('center').set_duration(3)
# Composite title over video
final_with_title = CompositeVideoClip([final_video, title_text])
# Export final video
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"ndi_professional_sequence_{timestamp}.mp4"
print(f"Exporting final video: {output_filename}")
final_with_title.write_videofile(
output_filename,
codec='libx264',
fps=moviepy_state.recording_fps,
audio=False,
preset='medium',
verbose=False,
logger=None
)
# Clean up clips
for clip in clips:
clip.close()
final_video.close()
final_with_title.close()
print(f"Professional video sequence created: {output_filename}")
# Display final video info
final_clip = VideoFileClip(output_filename)
print(f"Final video duration: {final_clip.duration:.2f} seconds")
print(f"Final video size: {final_clip.size}")
final_clip.close()
return output_filename
except Exception as e:
print(f"Error creating video sequence: {e}")
return None
# Create export button
export_button = widgets.Button(
description="Create Professional Video",
button_style='success',
icon='video'
)
def export_professional_video(_):
"""Export button handler"""
output_file = create_professional_video_sequence()
if output_file:
print(f"โ
Professional video exported successfully: {output_file}")
else:
print("โ Failed to export video")
export_button.on_click(export_professional_video)
# Add export button to display
export_row = widgets.HBox([export_button])
display_box.children = display_box.children + (export_row,)
def create_advanced_effects_showcase():
"""Demonstrate advanced MoviePy techniques with NDI recordings"""
global moviepy_state
if not moviepy_state.recording_segments:
print("No recorded segments available!")
return None
print("Creating advanced effects showcase...")
try:
# Load base clip
base_clip = VideoFileClip(moviepy_state.recording_segments[0]['path'])
# Create multiple versions with different effects
clips_with_effects = []
# 1. Split screen effect
left_half = base_clip.crop(x1=0, x2=base_clip.w//2)
right_half = base_clip.crop(x1=base_clip.w//2, x2=base_clip.w).fx(colorx, 0.3) # Desaturated
split_screen = CompositeVideoClip([
left_half.set_position('left'),
right_half.set_position('right')
])
clips_with_effects.append(split_screen.set_duration(3))
# 2. Picture-in-picture effect
main_clip = base_clip.fx(blur, 2) # Blurred background
pip_clip = base_clip.resize(0.3).set_position(('right', 'bottom')).margin(10)
pip_composite = CompositeVideoClip([main_clip, pip_clip])
clips_with_effects.append(pip_composite.set_duration(3))
# 3. Time effects
speed_up = base_clip.fx(vfx.speedx, 2.0) # 2x speed
clips_with_effects.append(speed_up.set_duration(2))
# 4. Color grading sequence
warm_clip = base_clip.fx(colorx, 1.2).fx(lum_contrast, 0, 30, 140) # Warm tone
cool_clip = base_clip.fx(colorx, 0.8).fx(lum_contrast, 0, -20, 100) # Cool tone
clips_with_effects.extend([warm_clip.set_duration(2), cool_clip.set_duration(2)])
# Combine all effects
final_showcase = concatenate_videoclips(clips_with_effects)
# Add background music placeholder (would need audio file)
# background_audio = AudioFileClip("background.mp3").set_duration(final_showcase.duration)
# final_showcase = final_showcase.set_audio(background_audio)
# Export
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_filename = f"ndi_advanced_effects_{timestamp}.mp4"
final_showcase.write_videofile(
output_filename,
codec='libx264',
fps=moviepy_state.recording_fps,
preset='medium',
verbose=False,
logger=None
)
# Cleanup
base_clip.close()
final_showcase.close()
for clip in clips_with_effects:
clip.close()
print(f"Advanced effects showcase created: {output_filename}")
return output_filename
except Exception as e:
print(f"Error creating advanced effects: {e}")
return None
# Advanced effects button
advanced_effects_button = widgets.Button(
description="Advanced Effects Showcase",
button_style='info',
icon='magic'
)
def create_advanced_effects(_):
output_file = create_advanced_effects_showcase()
if output_file:
print(f"โ
Advanced effects showcase created: {output_file}")
advanced_effects_button.on_click(create_advanced_effects)
# Add to display
advanced_row = widgets.HBox([advanced_effects_button])
display_box.children = display_box.children + (advanced_row,)
def cleanup_ndi_moviepy():
"""Comprehensive cleanup function"""
global moviepy_state
print("Cleaning up NDI MoviePy system...")
# Stop all operations
moviepy_state.stop_all()
# Clean up temporary files
moviepy_state.cleanup_temp_files()
# Clear display
image_widget.value = b''
info_label.value = "<b>Status:</b> Cleaned up"
recording_info.value = "<b>Recording:</b> Cleaned up"
print("โ
NDI MoviePy system cleaned up successfully")
def list_recorded_segments():
"""List all recorded segments"""
global moviepy_state
if not moviepy_state.recording_segments:
print("No recorded segments available")
return
print(f"๐น {len(moviepy_state.recording_segments)} recorded segments:")
for i, segment in enumerate(moviepy_state.recording_segments):
print(f" {i+1}. {segment['timestamp']} - {segment['duration']:.1f}s ({segment['frames']} frames)")
print(f" Path: {segment['path']}")
# Management buttons
cleanup_button = widgets.Button(description="Cleanup All", button_style='danger', icon='trash')
list_segments_button = widgets.Button(description="List Segments", button_style='info', icon='list')
def cleanup_handler(_):
cleanup_ndi_moviepy()
def list_segments_handler(_):
list_recorded_segments()
cleanup_button.on_click(cleanup_handler)
list_segments_button.on_click(list_segments_handler)
# Add management controls
management_row = widgets.HBox([cleanup_button, list_segments_button])
display_box.children = display_box.children + (management_row,)
# Auto-cleanup on kernel interrupt
import atexit
atexit.register(cleanup_ndi_moviepy)
print("๐ฌ NDI to MoviePy Professional Video Processing System Ready!")
print("๐ Instructions:")
print("1. Click 'Start Stream' to begin NDI capture")
print("2. Click 'Start Recording' to record segments")
print("3. Click 'Stop Recording' to end a segment")
print("4. Use 'Create Professional Video' to process with MoviePy effects")
print("5. Try 'Advanced Effects Showcase' for complex compositions")
- Live NDI stream capture with effects preview
- Professional recording at configurable FPS
- Real-time MoviePy-style effects application
- Multi-segment recording with automatic saving
- MoviePy integration for advanced effects (fade, color grading, blur, etc.)
- Professional export with titles and compositions
- Split-screen, picture-in-picture, and time effects
- Color enhancement and grading
- Spatial effects (mirror, rotate, crop)
- Temporal effects (speed changes, fade transitions)
- Composite effects (multiple layers, text overlays)
- Automatic file management and cleanup
- Segment-based recording for easy editing
- Professional codec settings and export options
- Memory-efficient processing with queues
This system gives you the same level of professional control as your MoviePy example, but applied to live NDI streams!