Skip to content

Instantly share code, notes, and snippets.

@atifaziz
Last active April 10, 2025 10:10
Show Gist options
  • Save atifaziz/1bc8e086b58487a6d0342e6f9d17be8f to your computer and use it in GitHub Desktop.
Save atifaziz/1bc8e086b58487a6d0342e6f9d17be8f to your computer and use it in GitHub Desktop.
Demonstration of controlled & concurrent processing of JSON messages while reading synchronously from a file queue
# /// script
# requires-python = ">=3.12"
# dependencies = []
# ///
import asyncio
from functools import partial
import json
import os
import sys
import time
from collections.abc import Iterator
from pathlib import Path
from random import random
from threading import Event
from typing import Generic, Protocol, Required, TypeVar, TypedDict
_M = TypeVar("_M", covariant=True)
class MessageReader(Protocol, Generic[_M]):
def __call__(self, *, stop_event: Event) -> Iterator[_M]: ...
class Message(TypedDict, total=False):
id: Required[int]
def read_tail(file_path: Path, /, *, stop_event: Event) -> Iterator[Message]:
"""Generator that yields new lines from a file as JSON objects."""
with open(file_path, "r") as f:
f.seek(0, os.SEEK_END) # Go to the end of file
while not stop_event.is_set():
if line := f.readline():
try:
print(f"Read new line: {line}", file=sys.stderr, end="")
yield json.loads(line)
except json.JSONDecodeError:
print("Error decoding JSON!", file=sys.stderr)
else:
time.sleep(0.5) # Wait a bit before trying again
async def process_message(message: Message) -> None:
"""Simulate processing a message with a random delay."""
try:
delay_secs = int(random() * 3 + 1)
print(f"Processing message #{message['id']} (duration: {delay_secs}s)")
await asyncio.sleep(delay_secs) # Simulate processing time
print(f"Processed message #{message['id']}")
except asyncio.CancelledError:
print(f"Processing of message #{message['id']} was cancelled", file=sys.stderr)
raise
async def main(*, message_reader: MessageReader[Message]) -> None:
stop_event = Event()
event_loop = asyncio.get_running_loop()
queue: asyncio.Queue[Message] = asyncio.Queue()
def process():
print("Starting reading of messages", file=sys.stderr)
for message in message_reader(stop_event=stop_event):
print(f"Posting message #{message['id']} to queue", file=sys.stderr)
event_loop.call_soon_threadsafe(queue.put_nowait, message)
print("Stopped reading of messages", file=sys.stderr)
process_future = event_loop.run_in_executor(None, process) # Run the generator in a separate thread
semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent tasks
tasks: list[asyncio.Task[None]] = []
def on_task_done(task: asyncio.Task[None]) -> None:
print(f"Task '{task.get_name()}' completed", file=sys.stderr)
semaphore.release()
tasks.remove(task)
try:
while True:
message = await queue.get()
await semaphore.acquire()
task = event_loop.create_task(process_message(message), name=f"message#{message['id']}")
tasks.append(task)
task.add_done_callback(on_task_done)
except:
print("Signaling stop", file=sys.stderr)
stop_event.set()
try:
print("Waiting for queue reading to stop", file=sys.stderr)
await asyncio.wait_for(process_future, timeout=2)
print("Queue reading stopped", file=sys.stderr)
except asyncio.TimeoutError:
print("Timed out waiting for queue reader to stop", file=sys.stderr)
raise
if __name__ == "__main__":
match sys.argv:
case [_, arg]:
asyncio.run(main(message_reader=partial(read_tail, Path(arg))))
case _:
print(f"Usage: {Path(__file__).name} <file_path>")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment