|
#!/usr/bin/env python3 |
|
''' |
|
Terminal Diffusion - A simple image generator to demonstrate ComfyUI invocation |
|
and inline graphics in terminals like Ghostty and iTerm2 that support it. |
|
|
|
Author: @thinkyhead |
|
Date: 2025-01-15 |
|
''' |
|
import os, re |
|
import argparse |
|
import base64 |
|
import json |
|
import websocket, asyncio, urllib |
|
import random |
|
|
|
from requests import get |
|
|
|
# API Configuration |
|
COMFYUI_BASE_URL = "http://127.0.0.1:8188" |
|
|
|
# A unique identifier. We may need a serial number too. |
|
CLIENT_ID = "imagine-v1" |
|
HTTP_HEADERS = { "User-Agent": "Mozilla/5.0" } |
|
|
|
# Local image folder, if known |
|
COMFYUI_IMAGE_FOLDER = None |
|
#HOME=os.environ.get('HOME') |
|
#COMFYUI_IMAGE_FOLDER = f"{HOME}/Projects/StableDiffusion/ComfyUI/output/comfyui" |
|
|
|
DEFAULT_WORKFLOW = "sd15-lcm" |
|
#DEFAULT_WORKFLOW = "flux-dev" |
|
#DEFAULT_WORKFLOW = "s35-large" |
|
|
|
OUTPUT_PREFIX = "imagine" |
|
|
|
HERE=os.path.dirname(os.path.abspath(__file__)) |
|
|
|
verbose = False |
|
def log(message): |
|
if verbose: |
|
print(message) |
|
|
|
spinner = ["🌄", "🌃", "🌅", "🌌", "🌇", "🌉", "🌆"] |
|
def print_spinner(): |
|
print(spinner.pop(0) + ' hold on...', end="\r", flush=True) |
|
spinner.append(spinner[0]) |
|
|
|
# Populate a workflow with our custom parameters |
|
def populate_workflow(desc, args): |
|
# CLI arguments plus other configuration... |
|
workflow_inputs = args.__dict__ |
|
workflow_inputs['filename_prefix'] = OUTPUT_PREFIX |
|
|
|
# Merge the workflow with the arguments |
|
workflow = desc['workflow'] |
|
fieldmap = desc['fieldmap'] |
|
|
|
# Apply the ratio using the workflow width * height as the native pixel count |
|
if args.ratio: |
|
r = args.ratio.split("/") if ("/" in args.ratio) else args.ratio.split(":") |
|
rw, rh = int(r[0]), int(r[1]) |
|
ratio = rw / rh |
|
fw, fh = fieldmap['width'], fieldmap['height'] |
|
nw, nh = workflow[fw[0]]['inputs'][fw[1]], workflow[fh[0]]['inputs'][fh[1]] |
|
pixels = nw * nh |
|
# Keep close to the same number of pixels |
|
ow = int((((pixels * ratio) ** 0.5) // 64) * 64) |
|
oh = int(((ow / ratio) // 64) * 64) |
|
# Update the workflow |
|
workflow[fw[0]]['inputs'][fw[1]], workflow[fh[0]]['inputs'][fh[1]] = ow, oh |
|
|
|
# Merge the workflow with the arguments |
|
log(f"Workflow:") |
|
for key, map in fieldmap.items(): |
|
nr, fld = map |
|
if key in workflow_inputs: |
|
if workflow_inputs[key] != None: |
|
workflow[nr]["inputs"][fld] = workflow_inputs[key] |
|
log(f" {key}: {workflow[nr]['inputs'][fld]}") |
|
|
|
# Load a workflow from a JSON file and populate it with the given prompt |
|
def load_and_populate_workflow(args): |
|
workflow_file = f"{args.workflow}.json" |
|
log(f"load_and_populate_workflow {workflow_file}") |
|
try: |
|
workflow_path = f"{HERE}/{workflow_file}" |
|
with open(workflow_path, 'r') as file: |
|
desc = json.load(file) |
|
populate_workflow(desc, args) |
|
return desc['workflow'] |
|
except FileNotFoundError: |
|
print(f"The file {workflow_file} was not found.") |
|
return None |
|
except json.JSONDecodeError: |
|
print(f"The file {workflow_file} contains invalid JSON.") |
|
return None |
|
|
|
# Enqueue a workflow for image generation |
|
def queue_workflow(workflow): |
|
p = { 'prompt': workflow, 'client_id': CLIENT_ID } |
|
data = json.dumps(p).encode("utf-8") |
|
log(f"queue_workflow data: {data}") |
|
try: |
|
req = urllib.request.Request( |
|
f"{COMFYUI_BASE_URL}/prompt", data=data, headers=HTTP_HEADERS |
|
) |
|
response = urllib.request.urlopen(req).read() |
|
return json.loads(response) |
|
except Exception as e: |
|
print(f"Error while queuing prompt: {e}") |
|
raise e |
|
|
|
# The URL for the given image filename, subfolder, and type (such as thumbnail) |
|
def get_image_url(filename, subfolder, folder_type): |
|
data = { 'filename': filename, 'subfolder': subfolder, 'type': folder_type } |
|
url_values = urllib.parse.urlencode(data) |
|
return f"{COMFYUI_BASE_URL}/view?{url_values}" |
|
|
|
# Get the history of a prompt id |
|
def get_history(prompt_id): |
|
log("get_history") |
|
|
|
req = urllib.request.Request( |
|
f"{COMFYUI_BASE_URL}/history/{prompt_id}", headers=HTTP_HEADERS |
|
) |
|
with urllib.request.urlopen(req) as response: |
|
return json.loads(response.read()) |
|
|
|
# Run a workflow, wait for it to complete, return the output images |
|
def run_workflow(ws, workflow): |
|
prompt_id = queue_workflow(workflow)['prompt_id'] |
|
output_images = [] |
|
while True: |
|
print_spinner() |
|
out = ws.recv() |
|
if isinstance(out, str): |
|
message = json.loads(out) |
|
if message["type"] == "executing": |
|
data = message["data"] |
|
if data["node"] is None and data["prompt_id"] == prompt_id: |
|
break # Execution is done |
|
else: |
|
continue # previews are binary data |
|
|
|
history = get_history(prompt_id)[prompt_id] |
|
for o in history['outputs']: |
|
for node_id in history["outputs"]: |
|
node_output = history["outputs"][node_id] |
|
if "images" in node_output: |
|
for image in node_output["images"]: |
|
url = get_image_url( |
|
image["filename"], image["subfolder"], image["type"] |
|
) |
|
output_images.append({"url": url}) |
|
|
|
return { 'data': output_images } |
|
|
|
# Send a workflow to the ComfyUI server, await the result, |
|
# and return the image results from the run_workflow thread. |
|
async def send_workflow_sync(workflow): |
|
try: |
|
# Connect to the WebSocket server |
|
try: |
|
ws_url = COMFYUI_BASE_URL.replace("http", "ws") |
|
ws = websocket.WebSocket() |
|
ws.connect(f"{ws_url}/ws?clientId={CLIENT_ID}") |
|
log("Connected to ComfyUI WebSocket.") |
|
except Exception as e: |
|
print(f"Failed to connect to WebSocket server: {e}") |
|
return None |
|
|
|
# Send the workflow |
|
try: |
|
log("Sending workflow to WebSocket server.") |
|
log(f"Workflow: {workflow}") |
|
images = await asyncio.to_thread(run_workflow, ws, workflow) |
|
except Exception as e: |
|
print(f"Error while receiving images: {e}") |
|
images = None |
|
|
|
ws.close() |
|
return images |
|
|
|
except Exception as e: |
|
print(f"Error communicating with WebSocket server: {e}") |
|
return None |
|
|
|
# Inline Images Protocol |
|
def detect_inline_images_protocol(): |
|
return os.environ.get("TERM_PROGRAM") == "iTerm.app" |
|
|
|
# Kitty Graphics Display Protocol |
|
def detect_kitty_graphics_protocol(): |
|
return os.environ.get("TERM_PROGRAM") == "ghostty" |
|
|
|
# Display the image data in the terminal using available protocols |
|
def display_image_data_in_terminal(img_data): |
|
img_b64 = base64.b64encode(img_data).decode('utf-8') |
|
|
|
if detect_inline_images_protocol(): |
|
# Based on print_image() above... |
|
print(f"\033]1337;File=inline=1;size={len(img_data)};name={OUTPUT_PREFIX}.png;preserveAspectRatio=1;type=image/png:{img_b64}\a") |
|
elif detect_kitty_graphics_protocol(): |
|
print(f"\033_Gf=100,a=T,t=d;{img_b64}\033\\") # Kitty display protocol |
|
else: |
|
log("No inline image support.") |
|
|
|
def display_image_file_in_terminal(image_path): |
|
with open(image_path, "rb") as img_file: |
|
display_image_data_in_terminal(img_file.read()) |
|
|
|
# Fetch the image at the given URL for display. Delete the image after display. |
|
def fetch_and_display_image(url): |
|
response = get(url) |
|
if response.status_code == 200: |
|
display_image_data_in_terminal(response.content) |
|
else: |
|
print(f"Failed to fetch image from URL {url}") |
|
|
|
# Run the workflow with the given prompt and other parameters |
|
# For simplicity assume only one image will be generated. |
|
# With minor changes this could handle workflow batches of any size. |
|
async def generate_image(args): |
|
log("generate_image") |
|
|
|
# Load and populate the workflow |
|
workflow = load_and_populate_workflow(args) |
|
|
|
print_spinner() |
|
result = await send_workflow_sync(workflow) |
|
if result: |
|
log(f"Image result: {result}") |
|
# Result: {'data': [{'url': 'http://127.0.0.1:8188/view?filename=kitty_00006_.png&subfolder=&type=output'}]} |
|
url = result['data'][0]['url'] |
|
|
|
# If COMFYUI_IMAGE_FOLDER is provided, we can directly display the image |
|
if COMFYUI_IMAGE_FOLDER != None: |
|
image_name = url.split("filename=")[1].split("&")[0] |
|
display_image_file_in_terminal(f"{COMFYUI_IMAGE_FOLDER}/{image_name}") |
|
else: |
|
fetch_and_display_image(url) |
|
else: |
|
print("Failed to generate image.") |
|
|
|
async def imagine(args): |
|
if args.prompt: |
|
await generate_image(args) |
|
else: |
|
# Otherwise run interactively |
|
while True: |
|
prompt = input("Imagine... > ").strip() |
|
if not prompt: print("Bye!") ; break |
|
await generate_image({'prompt': prompt}) |
|
|
|
def get_arguments(): |
|
# Define parser arguments for Usage and parsing |
|
# 'comfyui', 'workflow', and 'seed' are set to defaults if not provided |
|
parser = argparse.ArgumentParser(description="Generate images using ComfyUI. Enquote the prompt or escape all quotes. Without a prompt run interactively.") |
|
|
|
wgroup = parser.add_argument_group("workflow") |
|
wgroup.add_argument('--comfyui', help="ComfyUI base URL", default=os.environ.get('COMFYUI_BASE_URL', "http://127.0.0.1:8188")) |
|
wgroup.add_argument('--workflow', help="Choose a workflow template.", default=DEFAULT_WORKFLOW) |
|
wgroup.add_argument("--model", help="Override the model in the workflow.") |
|
|
|
ggroup = parser.add_argument_group("generation") |
|
ggroup.add_argument("--seed", help="Override the seed for the workflow.", default=random.randint(0, 18446744073709551614), type=int) |
|
ggroup.add_argument("--steps", help="Override the number of steps in the workflow.", type=int) |
|
ggroup.add_argument("--sampler", help="Override the sampler for the workflow. (e.g., ddim, euler, dpmpp_2m...)") |
|
ggroup.add_argument("--scheduler", help="Override the scheduler for the workflow. (e.g., normal, karras, sgm_uniform, simple...)") |
|
ggroup.add_argument("--negative", help="Provide a negative prompt. Take care to enquote.") |
|
|
|
sgroup = parser.add_argument_group("dimensions") |
|
sgroup.add_argument("--ratio", help="Set a w:h ratio, applying to the native pixel count. (e.g., 16:9)") |
|
sgroup.add_argument("--size", help="Override both image width and height in the workflow.", type=int) |
|
sgroup.add_argument("--width", help="Override image width in the workflow.", type=int) |
|
sgroup.add_argument("--height", help="Override image height in the workflow.", type=int) |
|
sgroup.add_argument("--scale", help="Override image scaling (if any) in the workflow.", type=float) |
|
|
|
parser.add_argument("prompt", nargs="?", help="Prompt text for image generation.") |
|
|
|
# Parse known arguments, forgiving extra and unknown arguments |
|
parsed_args = parser.parse_known_args() |
|
args = parsed_args[0] |
|
|
|
global COMFYUI_BASE_URL |
|
args.comfyui = args.comfyui.strip("/") |
|
COMFYUI_BASE_URL = args.comfyui |
|
|
|
# Ratio should not be combined with width, height, or size |
|
if args.ratio: |
|
if args.width or args.height or args.size: |
|
parser.error("--ratio cannot be combined with --width, --height, or --size.") |
|
elif not re.match(r"\d+:\d+", args.ratio) and not re.match(r"\d+/\d+", args.ratio): |
|
parser.error("--ratio must be in the form w:h or w/h.") |
|
|
|
if args.size: |
|
print(f"Size: {args.size}") |
|
if args.size < 512: |
|
parser.error("--size must be at least 512 pixels.") |
|
elif args.size % 64 != 0: |
|
parser.error("--size must be a multiple of 64 pixels.") |
|
else: |
|
if not args.width: args.width = args.size |
|
if not args.height: args.height = args.size |
|
|
|
if args.width: |
|
if args.width % 64 != 0: |
|
parser.error("--width must be a multiple of 64 pixels.") |
|
elif args.width < 256: |
|
parser.error("--width must be at least 256 pixels.") |
|
elif args.height and args.width * args.height < 512 * 512: |
|
parser.error("--width * --height must be at least 256K pixels.") |
|
|
|
if args.height: |
|
if args.height % 64 != 0: |
|
parser.error("--height must be a multiple of 64 pixels.") |
|
elif args.height < 256: |
|
parser.error("--height must be at least 256 pixels.") |
|
|
|
if args.prompt: |
|
args.prompt = args.prompt.strip() |
|
more = parsed_args[1] |
|
if len(more): |
|
args.prompt += " " + " ".join(more) |
|
if more[0].startswith("--"): |
|
unk = more[0].split("=")[0] |
|
parser.error(f"Unknown argument {unk}") |
|
|
|
return args |
|
|
|
def main(): |
|
# Randomize for this session |
|
random.seed(int(os.urandom(10).hex(), 16)) |
|
|
|
# Get arguments |
|
args = get_arguments() |
|
|
|
# We need asynchronous I/O |
|
asyncio.run(imagine(args)) |
|
|
|
if __name__ == "__main__": |
|
main() |