Skip to content

Instantly share code, notes, and snippets.

@minimalefforttech
Last active March 31, 2025 08:29
Show Gist options
  • Save minimalefforttech/d683c209028445426e5fe44ce0c12968 to your computer and use it in GitHub Desktop.
Save minimalefforttech/d683c209028445426e5fe44ce0c12968 to your computer and use it in GitHub Desktop.
This is an example showing how we can use psutil to see what files a subprocess is currently writing to. This is used when we aren't getting status updates to show the user such as during an alembic export.
import os
import sys
import time
import random
from pathlib import Path
import threading
from typing import List, IO, Optional
# Global registry of open file handles
open_file_handles: List[IO[bytes]] = []
file_handle_lock = threading.Lock()
def write_alembic_file(file_path: Path, file_size: int) -> None:
"""
Write a simulated Alembic file with random data.
Args:
file_path: Path where the file should be written
file_size: Size of the file in MB
"""
f = open(file_path, 'wb')
with file_handle_lock:
open_file_handles.append(f)
try:
for mb in range(file_size):
data = os.urandom(1024 * 1024) # 1MB of random data
f.write(data)
f.flush()
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
# Close the file handle when writing is complete
print(f"Finishing write to {file_path.name}")
f.close()
with file_handle_lock:
if f in open_file_handles:
open_file_handles.remove(f)
except Exception as e:
print(f"Error writing {file_path.name}: {str(e)}")
# File will be closed in cleanup
def main() -> None:
"""Create and write multiple simulated Alembic files concurrently."""
output_dir = Path("./temp_alembic")
output_dir.mkdir(exist_ok=True)
num_files = 5
max_workers = 3
active_threads: List[threading.Thread] = []
try:
for i in range(num_files):
filename = f"asset_{i+1:02d}.abc"
file_path = output_dir / filename
file_size = random.randint(1, 5)
thread = threading.Thread(
target=write_alembic_file,
args=(file_path, file_size),
name=f"Writer-{filename}"
)
if len(active_threads) < max_workers:
thread.start()
active_threads.append(thread)
else:
while len(active_threads) >= max_workers:
active_threads = [t for t in active_threads if t.is_alive()]
if len(active_threads) < max_workers:
break
time.sleep(0.1)
thread.start()
active_threads.append(thread)
# Wait for all writing to complete
for thread in active_threads:
thread.join()
# Keep process running for a bit
time.sleep(2)
finally:
# Clean up resources
with file_handle_lock:
for f in open_file_handles:
try:
f.close()
except Exception:
pass
open_file_handles.clear()
if __name__ == "__main__":
main()
from PySide6 import QtWidgets, QtCore, QtGui
import sys
import subprocess
import psutil
import os
import time
from pathlib import Path
import threading
from typing import List, Dict, Tuple, Optional, Set
class FileMonitorWindow(QtWidgets.QMainWindow):
"""
Window that monitors and displays file operations from a subprocess.
This window launches a subprocess that writes Alembic files and monitors
which files are open and being modified in real-time.
"""
# Custom signals for thread-safe UI updates
update_signal = QtCore.Signal(list, int, int)
log_signal = QtCore.Signal(str)
def __init__(self) -> None:
super().__init__()
self.setWindowTitle("Alembic Export Monitor")
self.setGeometry(100, 100, 800, 600)
self._setup_ui()
# Process monitoring state
self.process: Optional[subprocess.Popen] = None
self.monitor_thread: Optional[threading.Thread] = None
self.monitoring: bool = False
self.last_check_times: Dict[str, Tuple[float, int]] = {} # path -> (mtime, size)
self.completed_files: int = 0
self.total_files_expected: int = 5
self.completed_file_paths: Set[str] = set() # Track files already counted as completed
def _setup_ui(self) -> None:
"""Set up the user interface components."""
# Main layout
central_widget = QtWidgets.QWidget()
self.setCentralWidget(central_widget)
layout = QtWidgets.QVBoxLayout(central_widget)
# Status and progress
self.status_label = QtWidgets.QLabel("Status: Idle")
layout.addWidget(self.status_label)
self.progress_bar = QtWidgets.QProgressBar()
layout.addWidget(self.progress_bar)
# File list
layout.addWidget(QtWidgets.QLabel("Currently Open Files:"))
self.file_list = QtWidgets.QListWidget()
layout.addWidget(self.file_list)
# Log output
layout.addWidget(QtWidgets.QLabel("Process Output:"))
self.log_area = QtWidgets.QTextEdit()
self.log_area.setReadOnly(True)
layout.addWidget(self.log_area)
# Control buttons
button_layout = QtWidgets.QHBoxLayout()
self.start_button = QtWidgets.QPushButton("Start Alembic Export")
self.start_button.clicked.connect(self.start_process)
button_layout.addWidget(self.start_button)
layout.addLayout(button_layout)
# Connect signals
self.update_signal.connect(self.update_ui)
self.log_signal.connect(self.append_log)
def start_process(self) -> None:
"""Start the Alembic export subprocess and begin monitoring."""
if self.process and self.process.poll() is None:
self.log_signal.emit("Process is already running")
return
# Reset state
self.status_label.setText("Status: Starting process...")
self.log_area.clear()
self.file_list.clear()
self.progress_bar.setValue(0)
self.completed_files = 0
self.last_check_times = {}
self.completed_file_paths.clear()
# Start the subprocess
script_path = Path(__file__).parent / "fake_alembic_writer.py"
self.process = subprocess.Popen(
[sys.executable, str(script_path)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
self.log_signal.emit(f"Started process with PID {self.process.pid}")
# Begin monitoring
self.monitoring = True
self.monitor_thread = threading.Thread(target=self.monitor_process)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def monitor_process(self) -> None:
"""Monitor the subprocess and its children for open files."""
CHECK_INTERVAL = 0.5
try:
parent_process = psutil.Process(pid=self.process.pid)
while self.monitoring and self.process.poll() is None:
# Get all processes (parent and children)
try:
all_processes = [parent_process] + parent_process.children(recursive=True)
except psutil.NoSuchProcess:
break
# Check for open .abc files
open_files: List[str] = []
for proc in all_processes:
try:
proc_files = proc.open_files()
open_files.extend(f.path for f in proc_files if f.path.endswith('.abc'))
except (psutil.AccessDenied, psutil.NoSuchProcess):
continue
# Get file information and check for modifications
file_data: List[Tuple[str, float, int, bool]] = []
current_files: Set[str] = set(open_files)
# Check for any ABC files in the output directory that aren't currently open
# This helps detect files that were completed between polling intervals
output_dir = Path("./temp_alembic")
if output_dir.exists():
for abc_file in output_dir.glob("*.abc"):
file_path = str(abc_file.absolute())
if file_path not in current_files and file_path not in self.completed_file_paths:
# Add to last_check_times if not already tracked
if file_path not in self.last_check_times:
try:
stat = os.stat(file_path)
self.last_check_times[file_path] = (stat.st_mtime, stat.st_size)
self.log_signal.emit(f"Found new completed file: {abc_file.name}")
except (FileNotFoundError, PermissionError):
continue
for file_path in open_files:
try:
stat = os.stat(file_path)
modified_time = stat.st_mtime
size = stat.st_size
# Check if file is new or was modified
is_active = True
if file_path in self.last_check_times:
last_time, last_size = self.last_check_times[file_path]
is_active = (modified_time > last_time) or (size > last_size)
file_data.append((file_path, modified_time, size, is_active))
self.last_check_times[file_path] = (modified_time, size)
except (FileNotFoundError, PermissionError):
continue
# Check for completed files (were in last check but not in current)
newly_completed = [
path for path in list(self.last_check_times.keys())
if path not in current_files and path not in self.completed_file_paths
]
for path in newly_completed:
self.log_signal.emit(f"File completed: {Path(path).name}")
self.completed_file_paths.add(path)
del self.last_check_times[path]
# Update completed file count and UI
if newly_completed:
self.completed_files += len(newly_completed)
self.update_signal.emit(file_data, self.completed_files, self.total_files_expected)
time.sleep(CHECK_INTERVAL)
# Process has ended - make sure we caught all completed files
# Check if any files in the output directory weren't counted as completed
output_dir = Path("./temp_alembic")
if output_dir.exists():
all_abc_files = set(str(f.absolute()) for f in output_dir.glob("*.abc"))
tracked_files = set(self.last_check_times.keys())
final_completed = len(all_abc_files - tracked_files - self.completed_file_paths)
if final_completed > 0:
self.log_signal.emit(f"Found {final_completed} additional completed files at process end")
self.completed_files += final_completed
self.read_process_output()
self.update_signal.emit([], self.completed_files, self.total_files_expected)
self.log_signal.emit("Process has completed")
except Exception as e:
self.log_signal.emit(f"Error in monitor thread: {str(e)}")
self.monitoring = False
def read_process_output(self) -> None:
"""Read and log output from the subprocess."""
if not self.process:
return
while True:
line = self.process.stdout.readline().strip()
if not line:
break
self.log_signal.emit(line)
def update_ui(self, file_data: List[Tuple[str, float, int, bool]],
completed: int, total: int) -> None:
"""
Update the UI with current file information.
Args:
file_data: List of (path, modified_time, size, is_active) for open files
completed: Number of completed files
total: Total expected files
"""
self.file_list.clear()
# Add each file with its status
for path, modified, size, is_active in file_data:
name = Path(path).name
size_mb = size / (1024 * 1024)
mod_time = time.strftime("%H:%M:%S", time.localtime(modified))
status = "ACTIVE" if is_active else "idle"
item = QtWidgets.QListWidgetItem(
f"{name} - {size_mb:.2f} MB - Last modified: {mod_time} - {status}"
)
if is_active:
item.setForeground(QtGui.QColor("green"))
self.file_list.addItem(item)
# Update progress and status
progress = int((completed / total) * 100) if total > 0 else 0
self.progress_bar.setValue(progress)
open_files = len(file_data)
self.status_label.setText(f"Status: {open_files} files open, {completed} files completed of {total} total ({progress}%)")
def append_log(self, text: str) -> None:
"""Add text to the log area with auto-scroll."""
self.log_area.append(text)
cursor = self.log_area.textCursor()
cursor.movePosition(QtGui.QTextCursor.End)
self.log_area.setTextCursor(cursor)
def closeEvent(self, event: QtGui.QCloseEvent) -> None:
"""Handle window close event by cleaning up resources."""
self.monitoring = False
if self.process and self.process.poll() is None:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.process.kill()
event.accept()
def main() -> None:
"""Launch the application."""
app = QtWidgets.QApplication(sys.argv)
window = FileMonitorWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment