Skip to content

Instantly share code, notes, and snippets.

@hansthen
Last active May 25, 2025 19:14
Show Gist options
  • Save hansthen/4844fb643bb742edc0868f9a0d49d527 to your computer and use it in GitHub Desktop.
Save hansthen/4844fb643bb742edc0868f9a0d49d527 to your computer and use it in GitHub Desktop.
fetchlib with timeouts
mport aiohttp
import asyncio
from datetime import datetime
from aiostream import stream as aiozip
from pipe import Pipe
import json
import jq
async def schedule():
while True:
for interval in [1, 1, 20, 1, 1]:
yield
await asyncio.sleep(interval)
@Pipe
async def timeout(stream, duration):
flag = False
async def timer():
nonlocal flag
while True:
if flag:
yield "timeout"
flag = True
await asyncio.sleep(duration)
async def process():
nonlocal flag
async for _ in stream:
flag = False
yield "schedule"
combine = aiozip.merge(timer(), process())
async with combine.stream() as streamer:
async for item in streamer:
yield item
@Pipe
async def timeout2(stream, duration):
while True:
try:
item = await asyncio.wait_for(anext(stream), duration)
except TimeoutError:
yield "timeout"
else:
yield "schedule"
@Pipe
async def timeout3(stream, duration):
queue = asyncio.Queue(1)
async def drain(aiter):
async for item in aiter:
await queue.put(item)
task = asyncio.create_task(drain(stream))
while not task.done():
try:
await asyncio.wait_for(queue.get(), duration)
yield "schedule"
except TimeoutError:
yield "timeout"
@Pipe
async def date(stream):
async for reason in stream:
data = datetime.now()
yield [data, reason]
@Pipe
async def display(stream):
async for item in stream:
print(json.dumps(item, default=str))
asyncio.run(schedule() | timeout(1) | date | display)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment