Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created October 20, 2025 18:23
Show Gist options
  • Select an option

  • Save cnolanminich/e596ab8088308da57524fb107685a346 to your computer and use it in GitHub Desktop.

Select an option

Save cnolanminich/e596ab8088308da57524fb107685a346 to your computer and use it in GitHub Desktop.
concurrent_airflow_dagruns
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.settings import Session
from airflow.models import DagRun
from datetime import timedelta
import pandas as pd
import numpy as np
# Constants for flexibility
BUCKET_SIZE_SECONDS = 5
DAYS_BACK = 7
def export_dag_run_metadata_to_local(**kwargs):
# Query DagRun metadata
from airflow.utils import timezone
cutoff_time = timezone.utcnow() - timedelta(days=DAYS_BACK)
session = Session()
dag_runs = session.query(DagRun).filter(
DagRun.start_date >= cutoff_time
).order_by(
DagRun.start_date.desc()
).all()
session.close()
# Convert to DataFrame
data = [{
"dag_id": dr.dag_id,
"run_id": dr.run_id,
"execution_date": dr.execution_date.isoformat() if dr.execution_date else None,
"start_date": dr.start_date.isoformat() if dr.start_date else None,
"end_date": dr.end_date.isoformat() if dr.end_date else None,
"state": dr.state
} for dr in dag_runs]
df = pd.DataFrame(data)
if df.empty:
print("No DAG runs found. Exiting early.")
return
# Fill missing end_date (still-running DAGs) with now
df["end_date"] = df["end_date"].fillna(pd.Timestamp.utcnow())
# Normalize start and end dates
df["start_date"] = pd.to_datetime(df["start_date"])
df["end_date"] = pd.to_datetime(df["end_date"])
# Get time range
cutoff = pd.Timestamp.utcnow().normalize() - pd.Timedelta(days=DAYS_BACK)
df = df[df["start_date"] >= cutoff]
# Build buckets across days
buckets = []
for day in pd.date_range(end=pd.Timestamp.utcnow().normalize(), periods=DAYS_BACK+1):
for second in range(0, 86400, BUCKET_SIZE_SECONDS):
start_bucket = day + pd.Timedelta(seconds=second)
end_bucket = start_bucket + pd.Timedelta(seconds=BUCKET_SIZE_SECONDS)
buckets.append({
"run_date": day.date(),
"start_bucket": start_bucket,
"end_bucket": end_bucket
})
buckets_df = pd.DataFrame(buckets)
# Function to count active DAG runs in a given 5-second window
def count_active_runs(row):
return ((df["start_date"] < row["end_bucket"]) &
(df["end_date"] >= row["start_bucket"])).sum()
# Apply to all buckets
buckets_df["runs_running"] = buckets_df.apply(count_active_runs, axis=1)
# Aggregate to daily concurrency summary
summary_df = (
buckets_df
.groupby("run_date")["runs_running"]
.agg(
_5th_percentile=lambda x: int(np.percentile(x, 5)),
_25th_percentile=lambda x: int(np.percentile(x, 25)),
_50th_percentile=lambda x: int(np.percentile(x, 50)),
_75th_percentile=lambda x: int(np.percentile(x, 75)),
_90th_percentile=lambda x: int(np.percentile(x, 90)),
_95th_percentile=lambda x: int(np.percentile(x, 95)),
max_concurrent_runs="max"
)
.reset_index()
)
# Log output to Airflow logs
print('\n' + summary_df.to_string())
with DAG(
dag_id="dag_run_metadata_local",
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
description="Export DAG run metadata to local file"
) as dag:
export_metadata = PythonOperator(
task_id="export_dag_run_metadata",
python_callable=export_dag_run_metadata_to_local,
provide_context=True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment