- Callbacks = functions you pass into a function so it can notify you about events.
- For LLM streaming, three super-useful callbacks are:
on_start()– called once at the beginningon_token(text: str)– called for each streamed tokenset_status(text: str)– called whenever the function wants to update a status line
- This keeps your LLM logic clean and UI-agnostic while letting your REPL/UI control how things look.
- You can plug in:
print/console.print- Rich
status.update - a class with
on_token/set_statusmethods
- Pattern scales nicely from quick REPLs → full apps.
A callback is just a function you pass into another function:
from typing import Callable, Optional
def do_something(on_event: Optional[Callable[[str], None]] = None) -> None:
if on_event:
on_event("Something happened!")Usage:
def printer(msg: str) -> None:
print("EVENT:", msg)
do_something(on_event=printer)The callee controls when on_event is called; the caller decides what it does.
Below is a realistic complete_turn function that:
-
takes a
Turn(your Pydantic model) -
streams from litellm (
acompletion(..., stream=True)) -
supports three optional callbacks:
on_start()on_token(text)set_status(text)
-
returns a new
Turnwith the final assistant message + usage (no mutation)
from __future__ import annotations
from typing import Callable, Optional, Any, List
from pydantic import BaseModel, Field
from litellm import acompletion, stream_chunk_builder
from litellm.utils import Message, Usage
DEFAULT_MODEL = "gpt-4o-mini" # or your model
class Step(BaseModel):
message: Message
usage: Optional[Usage] = None
class Turn(BaseModel):
steps: List[Step] = Field(default_factory=list)
summary: Optional[str] = None
def add_system(self, content: str, **extra: Any) -> None:
self.steps.append(
Step(message=Message(role="system", content=content, **extra))
)
def add_user(self, content: str, **extra: Any) -> None:
self.steps.append(
Step(message=Message(role="user", content=content, **extra))
)
def add_raw(
self, message: Message | dict, usage: Optional[Usage] = None
) -> None:
if isinstance(message, Message):
self.steps.append(Step(message=message, usage=usage))
else:
self.steps.append(Step(message=Message(**message), usage=usage))
# Callback type aliases for clarity
OnStart = Callable[[], None]
OnToken = Callable[[str], None]
SetStatus = Callable[[str], None]
async def complete_turn(
turn: Turn,
model: str = DEFAULT_MODEL,
on_start: Optional[OnStart] = None,
on_token: Optional[OnToken] = None,
set_status: Optional[SetStatus] = None,
) -> Turn:
"""
Stream a completion for the given Turn and return a NEW Turn
with the assistant's final message + usage appended.
"""
messages = [step.message for step in turn.steps]
if set_status:
set_status("Preparing request…")
if on_start:
on_start()
# Call litellm with streaming enabled
stream = await acompletion(
model=model,
messages=messages,
stream=True,
)
if set_status:
set_status("Streaming response…")
chunks = []
# Stream token-by-token (or chunk-by-chunk)
async for chunk in stream:
chunks.append(chunk)
delta = chunk.choices[0].delta
text = delta.content or ""
if text and on_token:
on_token(text)
if set_status:
set_status("Finalizing…")
# Rebuild final response with litellm's helper
final = stream_chunk_builder(chunks, messages=messages)
msg: Message = final.choices[0].message
usage: Usage = final.usage
if set_status:
set_status("Done.")
# Return a *copy* so we don't mutate the original Turn
new_turn = turn.model_copy(deep=True)
new_turn.add_raw(msg, usage=usage)
return new_turnYou can keep it simple:
turn = Turn()
turn.add_system("You are a helpful assistant.")
turn.add_user("Explain pub/sub in simple terms.")
updated_turn = await complete_turn(
turn,
on_token=lambda t: print(t, end=""),
set_status=lambda s: print(f"[STATUS] {s}"),
)This will:
- print status updates like
[STATUS] Streaming response… - stream tokens as they arrive
- give you
updated_turncontaining the full assistant message + usage
from rich.console import Console
console = Console()
updated_turn = await complete_turn(
turn,
on_token=lambda t: console.print(t, end=""),
set_status=lambda s: console.print(f"[dim]{s}[/dim]"),
)Now both tokens and status lines are using Rich formatting.
Rich has a slick status spinner API.
You can pass status.update directly as set_status.
from rich.console import Console
console = Console()
async def run_turn(turn: Turn) -> Turn:
with console.status("[bold blue]Waiting for model…") as status:
updated_turn = await complete_turn(
turn,
on_token=lambda t: console.print(t, end=""),
set_status=status.update, # <— callback into Rich status spinner
)
console.print() # ensure we end on a new line
return updated_turnInside complete_turn, calls like:
set_status("Streaming response…")will live-update the spinner text.
As your interface grows, you may want to bundle behavior together:
from rich.console import Console
class TurnUI:
def __init__(self, console: Console):
self.console = console
def set_status(self, text: str) -> None:
self.console.print(f"[dim]{text}[/dim]")
def on_token(self, text: str) -> None:
self.console.print(text, end="")Usage:
console = Console()
ui = TurnUI(console)
updated_turn = await complete_turn(
turn,
on_token=ui.on_token,
set_status=ui.set_status,
)This pattern is nice because:
- it can hold state (buffers, counters, timestamps)
- it’s testable and reusable
- it supports more methods later (
on_error,on_usage, etc.)
You might also want a callback for when the stream completes:
from typing import Callable
OnEnd = Callable[[Message, Usage], None]Update signature:
async def complete_turn(
turn: Turn,
model: str = DEFAULT_MODEL,
on_start: Optional[OnStart] = None,
on_token: Optional[OnToken] = None,
set_status: Optional[SetStatus] = None,
on_end: Optional[OnEnd] = None,
) -> Turn:
...
final = stream_chunk_builder(chunks, messages=messages)
msg: Message = final.choices[0].message
usage: Usage = final.usage
if on_end:
on_end(msg, usage)
...Usage:
def on_end(msg: Message, usage: Usage) -> None:
console.print()
console.print(f"[green]Done[/green] (prompt={usage.prompt_tokens}, completion={usage.completion_tokens})")
updated_turn = await complete_turn(
turn,
on_token=lambda t: console.print(t, end=""),
set_status=status.update,
on_end=on_end,
)You can capture state in callbacks using closures:
buffer: list[str] = []
def on_token(text: str) -> None:
buffer.append(text)
console.print(text, end="")
updated_turn = await complete_turn(
turn,
on_token=on_token,
set_status=status.update,
)
full_text = "".join(buffer)No need for a class if you just want lightweight state.
-
Separation of concerns LLM logic (
complete_turn) doesn’t know or care about the UI. Your REPL / TUI / web UI just plugs in callbacks. -
Reusability Same core function works for:
- CLI REPL
- Rich TUI
- WebSocket streaming
- logging-only mode (no UI at all)
-
Async-friendly Works naturally with
async forstreaming from litellm. -
Scales with complexity Start with simple lambdas, move to controller objects, event buses, etc.
-
Use callbacks (
on_start,on_token,set_status,on_end) to let your LLM runner notify the outside world without knowing how it's displayed. -
Pass in:
- plain functions,
- lambdas,
- object methods (e.g.
status.update), - or dedicated UI controller classes.
-
This pattern is idiomatic, testable, and plays nicely with Rich and async streaming.