Skip to content

Instantly share code, notes, and snippets.

@hansthen
Created May 25, 2025 17:14
Show Gist options
  • Save hansthen/d9e9957c88a850dc11aa7b0894a163c8 to your computer and use it in GitHub Desktop.
Save hansthen/d9e9957c88a850dc11aa7b0894a163c8 to your computer and use it in GitHub Desktop.
fetchlib
import aiohttp
import asyncio
from pipe import Pipe
import json
import jq
api_key="<your key>"
async def schedule(interval):
while True:
yield
await asyncio.sleep(interval)
@Pipe
async def iss(stream):
async with aiohttp.ClientSession() as session:
async for _ in stream:
async with session.get("https://api.wheretheiss.at/v1/satellites/25544") as response:
data = await response.json()
yield data
@Pipe
async def reverse_geocode(stream, api_key):
async with aiohttp.ClientSession() as session:
async for item in stream:
url = (
"https://maps.googleapis.com/maps/api/geocode/json?"
"latlng={latitude},{longitude}&key=" + api_key
)
async with session.get(url.format(**item)) as response:
data = await response.json()
item["reverse"] = data
yield item
@Pipe
async def jq_transform(stream, program):
p = jq.compile(program)
async for item in stream:
for s in p.input(item).all():
yield s
@Pipe
async def display(stream):
async for item in stream:
print(json.dumps(item))
select_address = """
[.reverse.results[] | select(.types != ["plus_code"])] | first
"""
asyncio.run(schedule(10) | iss | reverse_geocode(api_key) |
jq_transform(select_address) | display)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment