Skip to content

Instantly share code, notes, and snippets.

@mvandermeulen
Forked from s3rgeym/app.py
Created January 21, 2025 15:41
Show Gist options
  • Save mvandermeulen/62c8f6b20bb2e9deb1ca0e82b90d41a4 to your computer and use it in GitHub Desktop.
Save mvandermeulen/62c8f6b20bb2e9deb1ca0e82b90d41a4 to your computer and use it in GitHub Desktop.
HH Applicant Telemetry Server
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional, AsyncGenerator
from sqlalchemy.dialects.postgresql import TIMESTAMP
from fastapi import Depends, FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, ValidationError, validator
from pydantic_settings import BaseSettings
from sqlalchemy import JSON, Column, DateTime, Integer, String, func
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, sessionmaker
import json
from contextlib import asynccontextmanager
class Settings(BaseSettings):
DATABASE_URL: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
engine = create_async_engine(settings.DATABASE_URL, echo=True)
async_sessionmaker = sessionmaker(
bind=engine, class_=AsyncSession, expire_on_commit=False
)
class Base(DeclarativeBase):
pass
class VacancyModel(Base):
__tablename__ = "hh_vacancies"
id: Mapped[int] = mapped_column(primary_key=True, index=True, nullable=False)
name: Mapped[str] = mapped_column(String, index=True, nullable=True)
type: Mapped[str] = mapped_column(String, nullable=True)
area: Mapped[str] = mapped_column(String, nullable=True)
salary: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
direct_url: Mapped[str] = mapped_column(String, nullable=True)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
published_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True))
contacts: Mapped[Dict[str, Any]] = mapped_column(JSON, nullable=True)
employer_id: Mapped[int] = mapped_column(index=True, nullable=True)
employer: Mapped["EmployerModel"] = relationship(
"EmployerModel",
back_populates="vacancies",
)
class EmployerModel(Base):
__tablename__ = "hh_employers"
id: Mapped[int] = mapped_column(primary_key=True, index=True, nullable=False)
name: Mapped[str] = mapped_column(String, index=True, nullable=True)
type: Mapped[str] = mapped_column(String, nullable=True)
description: Mapped[str] = mapped_column(String, nullable=True)
site_url: Mapped[str] = mapped_column(String, nullable=True)
area: Mapped[str] = mapped_column(String, nullable=True)
vacancies: Mapped[List["VacancyModel"]] = relationship("VacancyModel", back_populates="employer")
class VacancyCreate(BaseModel):
name: Optional[str] = Field(None, max_length=1023)
type: Optional[str] = Field(None, max_length=31)
area: Optional[str] = Field(None, max_length=1023)
salary: Optional[Dict[str, Any]] = None
direct_url: Optional[str] = Field(None, max_length=4095)
created_at: Optional[datetime] = None
published_at: Optional[datetime] = None
contacts: Optional[Dict[str, Any]] = None
employer_id: Optional[int] = None
@validator('salary', 'contacts')
def validate_json_length(cls, value: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if value and len(json.dumps(value)) > 4095:
raise ValueError('JSON data exceeds the maximum length of 4095 characters')
return value
class EmployerCreate(BaseModel):
name: Optional[str] = Field(None, max_length=1023)
type: Optional[str] = Field(None, max_length=31)
description: Optional[str] = Field(None, max_length=4095)
site_url: Optional[str] = Field(None, max_length=4095)
area: Optional[str] = Field(None, max_length=1023)
class TelemetryData(BaseModel):
vacancies: Optional[Dict[str, VacancyCreate]] = Field(default_factory=dict)
employers: Optional[Dict[str, EmployerCreate]] = Field(default_factory=dict)
app = FastAPI()
@app.exception_handler(RequestValidationError)
async def custom_validation_exception_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
return JSONResponse(
status_code=422,
content={"error": type(exc).__name__, "error_details": exc.errors()},
)
@app.exception_handler(Exception)
async def unicorn_exception_handler(request: Request, exc: Exception) -> JSONResponse:
return JSONResponse(
status_code=400,
content={"error": type(exc).__name__, "error_details": str(exc)},
)
async def create_db_and_tables() -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_sessionmaker() as session:
try:
yield session
finally:
await session.close()
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Before app starts
await create_db_and_tables()
try:
yield
finally:
# After the app stops
await engine.dispose()
app = FastAPI(lifespan=lifespan)
@app.get("/")
def root() -> Dict[str, str]:
return {"message": "It's alive!"}
@app.post("/collect")
async def collect_telemetry(
telemetry_data: TelemetryData, db: AsyncSession = Depends(get_db)
) -> Dict[str, bool]:
try:
vacancy_values = [
{"id": int(vacancy_id), **vacancy.dict()}
for vacancy_id, vacancy in telemetry_data.vacancies.items()
]
employer_values = [
{"id": int(employer_id), **employer.dict()}
for employer_id, employer in telemetry_data.employers.items()
]
insert_vacancy = insert(VacancyModel)
insert_employer = insert(EmployerModel)
await db.execute(
insert_vacancy.values(vacancy_values).on_conflict_do_update(
index_elements=[VacancyModel.id],
set_={
"name": insert_vacancy.excluded.name,
"type": insert_vacancy.excluded.type,
"area": insert_vacancy.excluded.area,
"salary": insert_vacancy.excluded.salary,
"direct_url": insert_vacancy.excluded.direct_url,
"created_at": insert_vacancy.excluded.created_at,
"published_at": insert_vacancy.excluded.published_at,
"contacts": insert_vacancy.excluded.contacts,
"employer_id": insert_vacancy.excluded.employer_id,
},
)
)
await db.execute(
insert_employer.values(employer_values).on_conflict_do_update(
index_elements=[EmployerModel.id],
set_={
"name": insert_employer.excluded.name,
"type": insert_employer.excluded.type,
"description": insert_employer.excluded.description,
"site_url": insert_employer.excluded.site_url,
"area": insert_employer.excluded.area,
},
)
)
await db.commit()
return {"success": True}
except Exception as ex:
await db.rollback()
raise ex
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment