Skip to content

Instantly share code, notes, and snippets.

View draincoder's full-sized avatar
😴

treaditup draincoder

😴
View GitHub Profile
@draincoder
draincoder / main.py
Last active December 11, 2024 14:20
FastStream Redis batch
import asyncio
from faststream import FastStream
from faststream.redis import RedisBroker, RedisMessage, StreamSub
broker = RedisBroker()
app = FastStream(broker)
@draincoder
draincoder / api.py
Created November 13, 2024 16:59
FastAPI + FastStream tracing
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from faststream.redis import RedisMessage
from faststream.redis.fastapi import RedisRouter
from faststream.redis.opentelemetry import RedisTelemetryMiddleware
router = RedisRouter()
@draincoder
draincoder / example.py
Created September 10, 2024 21:14
FastStream span linking example
import asyncio
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Link
@draincoder
draincoder / decorator.py
Last active November 14, 2024 07:50
A typed decorator with optional parameters for sync and async functions
import asyncio
from functools import wraps
from typing import Any, Callable, ParamSpec, TypeVar, cast, overload
F_Spec = ParamSpec("F_Spec")
F_Return = TypeVar("F_Return")
Func = Callable[F_Spec, F_Return]
@draincoder
draincoder / fastapi_depends.py
Last active February 28, 2024 23:42
Depends vs Dishka
import time
import anyio
from fastapi import Depends, FastAPI
app = FastAPI()
def sync_dependency_one() -> str:
time.sleep(0.00000001)
@draincoder
draincoder / tg_web_app_auth.py
Last active June 26, 2024 08:12
Python implementation of Telegram.WebApp.initData validation
import hashlib
import hmac
import json
from dataclasses import dataclass
from operator import itemgetter
from typing import Any
from urllib.parse import parse_qsl
@dataclass(eq=False)
@draincoder
draincoder / aiohttp_base_client.py
Last active January 23, 2024 21:38
Aiohttp base client for JSON API
import ssl
from types import TracebackType
from typing import Any, Type
import certifi
from aiohttp import ClientSession, TCPConnector, ClientTimeout, ClientResponse
from aiohttp.typedefs import StrOrURL
from aiohttp import ClientResponseError
Headers = dict[str, str] | None