Skip to content

Instantly share code, notes, and snippets.

@eduardogpg
Last active February 12, 2025 15:30
Show Gist options
  • Save eduardogpg/0b0e596e522eeb9f61b1c55866984fe5 to your computer and use it in GitHub Desktop.
Save eduardogpg/0b0e596e522eeb9f61b1c55866984fe5 to your computer and use it in GitHub Desktop.
Scrip to kill psql queries.
import logging
from typing import Optional
from django.db import connection
from concurrent.futures import TimeoutError, Future
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueryTimeoutException(BaseException):
def __init__(self, message: str, pid: Optional[int] = None):
super().__init__(message)
self.pid = pid
class PIDQueryProcess:
def __init__(self):
self._pid: Optional[int] = None
@property
def pid(self) -> Optional[int]:
return self._pid
@pid.setter
def pid(self, value: int) -> None:
if self._pid is not None:
raise AttributeError("PID is already set.")
self._pid = value
def _execute_query(handle_query_future: Future, query: str, pid_holder: PIDQueryProcess):
with connection.cursor() as cursor:
cursor.execute("SELECT pg_backend_pid();")
pid = cursor.fetchone()[0]
pid_holder.pid = pid
logger.info(f"Executing query with PID: {pid}")
cursor.execute(query)
result = cursor.fetchall()
handle_query_future.set_result(result)
def _cancel_postgres_query(pid: int) -> bool:
try:
with connection.cursor() as cursor:
cursor.execute(f"SELECT pg_cancel_backend({pid});")
if canceled := cursor.fetchone()[0]:
logger.info(f"Query with PID {pid} was successfully canceled.")
else:
logger.warning(f"Failed to cancel query with PID {pid}.")
return canceled
except Exception as e:
logger.error(f"Error canceling query with PID {pid}: {e}")
return False
def run_query_with_timeout(query: str, timeout: int = 60):
handle_query_future = Future()
pid_holder = PIDQueryProcess()
_execute_query(
query=query,
pid_holder=pid_holder,
handle_query_future=handle_query_future,
)
try:
response = handle_query_future.result(timeout=timeout)
return response
except TimeoutError as e:
logger.warning(f"Query exceeded timeout of {timeout} seconds. Attempting to cancel...")
if not pid_holder.pid:
raise QueryTimeoutException("Query exceeded timeout but PID could not be retrieved.")
if _cancel_postgres_query(pid_holder.pid):
raise QueryTimeoutException(f"Query exceeded timeout and was canceled. PID: {pid_holder.pid}", pid_holder.pid)
else:
raise QueryTimeoutException(f"Query exceeded timeout but cancellation failed. PID: {pid_holder.pid}", pid_holder.pid)
# Test
def main(max_seconds: int = 120):
try:
run_query_with_timeout(f"SELECT pg_sleep({max_seconds});")
except QueryTimeoutException as e:
logger.error(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment