You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Comprehensive Testing Plan for Migration 0092_3_2_0_ui_improvements_for_deadlines.py
In addition to this README, there are three files in this Gist:
[COMPRESSED]: Tests the migration when COMPRESS_SERIALIZED_DAGS is True.
[UNCOMPRESSED]: Tests the migration when COMPRESS_SERIALIZED_DAGS is False, which is the Airflow default setting.
[UNIFIED]: All steps for both of the above tests combined into one file, in case that is more convenient.
Note 1:
In order to compare your output directly to the Expected Output, copy the Dag file from Appendix 1 of the
test file into your files/dags directory and ensure there are no other Dags. The Appendix is the same in all
three files, so you only need to do this step once.
Note 2:
In all Expected Output sections where you see a timestamp or UUID, you can assume that the exact
values are not important unless otherwise noted. Ensuring that your output is properly formatted
in those spots is sufficient for these tests. For example, the timestamp you see won't be the same
time, but should be formatted the same.
Note 3:
Depending on your exact configuration your outputs may show some deprecation warnings not included
in the Expected Output sections. Those are fine for the purposes of this test and can be ignored.
Note 4:
For full comprehensive testing we need to test with the AIRFLOW__CORE__COMPRESS_SERIALIZED_DAGS configuration
setting in both True and False states. If you are using the [UNIFIED], all steps
are identical unless otherwise indicated. Look for the [COMPRESSED ONLY] or [UNCOMPRESSED ONLY] tags.
This document contains ONLY the steps of the Unified Plan for the case where
COMPRESS_SERIALIZED_DAGS is True. Steps ending in "B" are different form their Uncompressed
counterparts, and steps with non-integer numbering (Such as 2.5B) are additional steps to validate data compression
is working correctly and ensure that the numbering of the main steps stay aligned.
Upgrade testing
Phase 1: Setup and Baseline Validation
Prerequisite:
Create a file files/airflow-breeze-config/init.sh with the contents
export AIRFLOW__CORE__COMPRESS_SERIALIZED_DAGS=True
Step U-0: Start Airflow with your Dags and a fresh DB
breeze start-airflow --db-reset
Expected Output:
You should be in the Breeze environment with all processes running.
[Optional] Step U-0.5: QoL Step
I recommend you open a new terminal and run breeze exec so you have a fresh full-sized terminal that is
easier to copy/paste. Alternatively, if you want to stay in tmux you may find it easier to zoom to just
the one panel with CTRL-b z (which also undoes it later)
Step U-1: Unpause and Trigger a Dag to populate the deadline table
################# STEP U-2.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL AND (data IS NULL OR data::text = 'null') THEN 'data_compressed' WHEN data IS NOT NULL AND data::text != 'null' THEN 'data' ELSE 'neither' END as storage_column, length(data_compressed) as compressed_size_bytesFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the number of Dags from Step U-2, all of which should show data_compressed in the storage_column.
Note 2: The exact output here will depend on if you have run the tests previously. For this one command,
and only this command, you may see several warnings that some Dags have "non-string deadline values",
and you may or may not also see the statistics reported, that is fine this once.
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
2025-10-31T21:49:17.784487Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-10-31T21:49:17.839842Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-10-31T21:49:17.870823Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T21:49:17.870997Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T21:49:17.912714Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Downgrade complete
Alternative acceptable output will look something like this:
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:58:56.202545Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:58:56.282798Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:58:56.325404Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:58:56.325597Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:58:56.371662Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 7 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
WARNING: Dag duplicate_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag duplicate_deadline_two_of_two has non-string deadline values, skipping
WARNING: Dag two_unique_deadlines has non-string deadline values, skipping
WARNING: Dag unique_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag unique_deadline_two_of_two has non-string deadline values, skipping
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 0 had DeadlineAlerts.
Restored 0 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Step U-5: Verify downgrade worked
############## STEP U-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade:
tablename
-----------
(0 rows)
Step U-6B: Validate data is intact after downgrade
############### STEP U-6B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id""")for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: print(f"dag_id|status|deadline_data") print(f"{dag_id}|HAS_DEADLINE|{json.dumps(deadline_data, indent=2)}") print()cur.close()conn.close()EOF
Expected Output:
Save this output somewhere. This output is pretty long; most important for now is that the final line
should say 5 rows, matching dags_with_deadlines from Step U-2. The exact json deadline data will be used
later to validate that the data has been migrated without modification, and to validate the downgrade.
Step 8.5B: Verify Migration Worked with Compressed Data
################# STEP U-8.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL THEN 'compressed' WHEN data IS NOT NULL THEN 'uncompressed' END as storage_typeFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the total_dags from Step U-2, all showing "compressed".
############### STEP U-10 ##############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT serialized_dag_id, reference, interval, callback_def, created_atFROM deadline_alertORDER BY serialized_dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and the data in the columns
should match the data as presented in the long json output from Step U-6.
serialized_dag_id|reference|interval|callback_def|created_at
duplicate_deadline_one_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.131234+00
duplicate_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.048992+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|30|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Early warning."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|40|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Last call."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
unique_deadline_one_of_two|{"datetime": 334720800.0, "reference_type": "FixedDatetimeDeadline"}|0|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Deadline in the past; Alert should trigger immediately!"}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.961688+00
unique_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|20|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Long deadline and a short task; Alert should not trigger."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.992653+00
(6 rows)
Step U-11: Verify foreign key relationships
############### STEP U-11 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tc.constraint_name, tc.table_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_nameFROM information_schema.table_constraints AS tcJOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schemaJOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schemaWHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name IN ('deadline_alert', 'deadline');"
############### STEP U-12 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, da.id as deadline_alert_id, COUNT(d.id) as linked_deadlinesFROM deadline_alert daJOIN serialized_dag sd ON da.serialized_dag_id = sd.idLEFT JOIN deadline d ON d.deadline_alert_id = da.idGROUP BY sd.dag_id, da.idORDER BY sd.dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and since we only executed one
Dag, only that one should have a linked deadline.
################ STEP U-15B ###############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT sd.dag_id, sd.data_compressed, da.id::text, da.interval FROM serialized_dag sd JOIN deadline_alert da ON da.serialized_dag_id = sd.id WHERE sd.dag_id = 'two_unique_deadlines' ORDER BY da.interval""")print(" dag_id | uuid_in_serialized_dag | uuid_in_deadline_alert | interval")print("----------------------+--------------------------------------+--------------------------------------+----------")row_count = 0for dag_id, data_compressed, alert_id, interval in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_uuids = dag_data['dag']['deadline'] # Check if this alert_id is in the deadline array if alert_id in deadline_uuids: print(f" {dag_id:20} | {alert_id:36} | {alert_id:36} | {interval:8}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
Two rows (matching the number of DeadlineAlerts in the dag named two_unique_deadlines). Each row should
show matching deadline_alert_ids in both the uuid_in_serialized_dag and uuid_in_deadline_alert columns.
################## STEP U-15.5B #################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, length(sd.data_compressed) as compressed_bytes, COUNT(da.id) as deadline_countFROM serialized_dag sdLEFT JOIN deadline_alert da ON da.serialized_dag_id = sd.idWHERE sd.data_compressed IS NOT NULL AND sd.dag_id LIKE '%deadline%'GROUP BY sd.dag_id, sd.data_compressedORDER BY sd.dag_id;"
Expected Output:
7 rows, matching the total_dags from Step U-2. Each listed Dag will have the correct number of deadlines
matching Step U-3.
If you plan on continuing to the compressed downgrade testing, no action is needed.
Otherwise, you likely want to either remove the file created in step U-0 or comment
out the line by adding a # before it.
Phase 4: Validation Summary
Step U-16: Compare migration output with baseline data
Compare the migration console output counts with Step U-3 baseline counts
Verify: Processed X Dags matches total_dags from baseline
Verify: Y had DeadlineAlerts matches dags_with_deadlines from baseline
Verify: Migrated Z DeadlineAlert configurations matches total_deadline_alerts from baseline
Expected Results
✅ Successful Migration Indicators:
All baseline Dag counts match migration output
DeadlineAlert table contains expected number of records
Foreign key constraints are properly created and functional
No errors during migration process
❌ Failure Indicators:
Count mismatches between baseline and migration output
Missing DeadlineAlert records
Foreign key constraint errors
Migration errors or exceptions
Downgrade Testing
Phase 1: Pre-Downgrade Validation
Step D-0: Ensure you are starting from a fully upgraded state
If you are continuing from the Upgrade Test above, you are ready to go. If not, ensure the following
prerequisites are met before continuing:
Database should have the deadline_alert table populated
serialized_dag should contain UUID arrays in the deadline field
All upgrade tests (Steps 1-15) should have passed
Ensure that files/airflow-breeze-config/init.sh exists from step U-0
[Optional] Step D-0.5: QoL Step
As above, I recommend you open a new terminal and run breeze exec so you have a fresh full-sized
terminal that is easier to copy/paste. Alternatively, if you want to stay in tmux you may find it
easier to zoom to just the one panel with CTRL-b z (which also undoes it later)
Step D-1B: Verify current state has UUID arrays in serialized_dag
############### STEP D-1B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute("SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id")print("dag_id|deadline_type|deadline_count|deadline_data")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: deadline_type = 'array' if isinstance(deadline_data, list) else 'object' deadline_count = len(deadline_data) if isinstance(deadline_data, list) else 1 print(f"{dag_id}|{deadline_type}|{deadline_count}|{json.dumps(deadline_data, indent=4)}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
All Dags with deadlines should show deadline_type as 'array' and contain UUID strings (not objects).
There should be 5 rows matching dags_with_deadlines from Step U-2 in the upgrade test.
Step D-2.5B: Verify Data is Still Compressed Before Downgrade
################# STEP D-2.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL THEN 'compressed' WHEN data IS NOT NULL THEN 'uncompressed' END as storage_typeFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the total_dags from step D-2, each showing "compressed". Output should be identical to
step U-8.5B
No errors (some deprecation warnings may be printed depending on your environment and are acceptable)
The printed counts should be accurate and match the output of Step U-2 (7 dags, 5 have DeadlineAlerts, 5 total configurations)
Processing batch 2... should not be printed
The last line should say "Downgrade complete"
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:29:39.332589Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:29:39.399138Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:29:39.435089Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:29:39.435300Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:29:39.486819Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 7 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Restored 6 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Phase 3: Validate Downgrade Results
Step D-5: Verify deadline_alert table was dropped
############# PHASE 3 ######################### STEP D-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade.
tablename
-----------
(0 rows)
Step D-6B: Verify data was restored to original format
############### STEP D-6B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id""")for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: print(f"dag_id|status|deadline_data") print(f"{dag_id}|HAS_DEADLINE|{json.dumps(deadline_data, indent=2)}") print()cur.close()conn.close()EOF
Expected Output:
This output should EXACTLY match the output saved from Step U-6. There should be 5 rows, matching the
dags_with_alerts from step D-2, with the full object structure (not UUID strings).
One row showing the complete callback structure with data, version , and classname fields. Note
that the order they appear in is not important as long as they are all there.
dag_id|full_callback_structure
unique_deadline_one_of_two|{
"__data__": {
"path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier",
"kwargs": {
"text": "Deadline in the past; Alert should trigger immediately!"
}
},
"__version__": 0,
"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback"
}
(1 row)
Phase 4: Re-run Upgrade to Verify Idempotency
Step D-11: Run upgrade migration again
############# PHASE 4 ########################## STEP D-11 ##############
airflow db migrate
Expected Output:
The migration should run successfully again, producing the same results as the first upgrade.
DB: postgresql+psycopg2://postgres:***@postgres/airflow
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
2025-11-01T03:36:22.259966Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.260149Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.268030Z [info ] Migrating the Airflow database [airflow.utils.db] loc=db.py:1131
2025-11-01T03:36:22.280921Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.281204Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.331632Z [info ] Running upgrade b87d2135fa50 -> 55297ae24532, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting migration of 7 Dags in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Migrated 6 DeadlineAlert configurations.
No Dags encountered errors during migration.
2025-11-01T03:36:22.377244Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.377478Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
Database migrating done!
Step D-12B: Verify deadline_alert_id arrays were recreated
################ STEP D-12B ###############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute("SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id")print(" dag_id | deadline_type | deadline_count")print("-------------------------------+---------------+----------------")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: deadline_type = 'array' if isinstance(deadline_data, list) else 'object' deadline_count = len(deadline_data) if isinstance(deadline_data, list) else 1 print(f" {dag_id:29} | {deadline_type:13} | {deadline_count:14}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
All 5 Dags should show array type with UUID strings (matching Step D-1 output).
✅ deadline_alert table was properly dropped and recreated
✅ All deadline columns were removed and re-added
✅ Re-upgrade produced identical results to first upgrade
✅ Multi-deadline Dag structure preserved through full cycle
✅ Callback structure with __classname, __version, data restored correctly
Expected Results
✅ Successful Downgrade Indicators:
Downgrade migration completes without errors
deadline_alert table is dropped
serialized_dag data exactly matches pre-upgrade format
All deadline alert objects have correct nested structure
Re-upgrade produces identical results to first upgrade
No data loss or corruption
❌ Failure Indicators:
Downgrade migration errors or exceptions
serialized_dag data doesn't match original format
Missing or incorrect __type, __var, __classname fields
Interval values changed (e.g., 1.0 became 1)
Multi-deadline arrays corrupted or missing elements
Re-upgrade produces different results
Appendix 1: Sample Dags
Ensure there are at least three Dags, one with a single DeadlineAlert, one with multiple DeadlineAlerts, and
one without. Below is the Dag file I used for the testing. The callback will fail without a Slack connection
configured, but that is enough for our testing purposes.
fromdatetimeimporttimedelta, datetimefromairflowimportDAGfromairflow.providers.slack.notifications.slack_webhookimportSlackWebhookNotifierfromairflow.providers.standard.operators.emptyimportEmptyOperatorfromairflow.sdkimporttaskfromairflow.sdk.definitions.deadlineimportDeadlineAlert, DeadlineReference, AsyncCallback@task.bash(task_id='sleep_task')defsleep_10_secs():
return'sleep 10'withDAG(dag_id='no_deadline_1'):
EmptyOperator(task_id='hello_world')
withDAG(dag_id='no_deadline_2'):
EmptyOperator(task_id='hello_world_again')
withDAG(
dag_id="unique_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
interval=timedelta(0),
callback=AsyncCallback(
SlackWebhookNotifier,
{"text": "Deadline in the past; Alert should trigger immediately!"},
)
)
):
sleep_10_secs()
withDAG(
dag_id="unique_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=20),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Long deadline and a short task; Alert should not trigger."},
)
)
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="two_unique_deadlines",
deadline=[
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=30),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Early warning."},
)
),
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=40),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Last call."},
)
),
]
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="duplicate_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()
withDAG(
dag_id="duplicate_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()
This document contains ONLY the steps of the Unified Plan for the case where COMPRESS_SERIALIZED_DAGS is False.
Upgrade testing
Phase 1: Setup and Baseline Validation
Step U-0: Start Airflow with your Dags and a fresh DB
breeze start-airflow --db-reset
Expected Output:
You should be in the Breeze environment with all processes running.
[Optional] Step U-0.5: QoL Step
I recommend you open a new terminal and run breeze exec so you have a fresh full-sized terminal that is
easier to copy/paste. Alternatively, if you want to stay in tmux you may find it easier to zoom to just
the one panel with CTRL-b z (which also undoes it later)
Step U-1: Unpause and Trigger a Dag to populate the deadline table
############## STEP U-2 #############
psql postgresql://postgres:airflow@postgres/airflow -c "WITH latest_dags AS ( SELECT DISTINCT ON (dag_id) CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN CASE WHEN jsonb_typeof(data->'dag'->'deadline') = 'array' THEN jsonb_array_length(data->'dag'->'deadline') ELSE 1 END ELSE 0 END as deadline_count FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC)SELECT COUNT(*) as total_dags, COUNT(*) FILTER (WHERE deadline_count > 0) as dags_with_deadlines, SUM(deadline_count) as total_deadline_alertsFROM latest_dags;"
############## STEP U-3 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as deadline_status, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN CASE WHEN jsonb_typeof(data->'dag'->'deadline') = 'array' THEN jsonb_array_length(data->'dag'->'deadline') ELSE 1 END ELSE 0 END as deadline_countFROM ( SELECT DISTINCT ON (dag_id) dag_id, data FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC) latest_dagsORDER BY dag_id;"
Expected Output:
7 rows, matching total_dags from U-2, all Dags are listed, and the correct number of deadlines are
indicated for each:
Note 2: The exact output here will depend on if you have run the tests previously. For this one command,
and only this command, you may see several warnings that some Dags have "non-string deadline values",
and you may or may not also see the statistics reported, that is fine this once.
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
2025-10-31T21:49:17.784487Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-10-31T21:49:17.839842Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-10-31T21:49:17.870823Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T21:49:17.870997Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T21:49:17.912714Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Downgrade complete
Alternative acceptable output will look something like this:
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:58:56.202545Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:58:56.282798Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:58:56.325404Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:58:56.325597Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:58:56.371662Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 5 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
WARNING: Dag duplicate_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag duplicate_deadline_two_of_two has non-string deadline values, skipping
WARNING: Dag two_unique_deadlines has non-string deadline values, skipping
WARNING: Dag unique_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag unique_deadline_two_of_two has non-string deadline values, skipping
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique DAGs), 5 had DeadlineAlerts.
Restored 7 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Step U-5: Verify downgrade worked
############## STEP U-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade:
tablename
-----------
(0 rows)
Step U-6: Validate data is intact after downgrade
############## STEP U-6 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as status, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM ( SELECT DISTINCT ON (dag_id) dag_id, data FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC) latest_dagsWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array')ORDER BY dag_id;"
Expected Output:
Save this output somewhere. This output is pretty long; most important for now is that the final line
should say 5 rows, matching dags_with_deadlines from Step U-2. The exact json deadline data will be used
later to validate that the data has been migrated without modification, and to validate the downgrade.
############### STEP U-10 ##############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT serialized_dag_id, reference, interval, callback_def, created_atFROM deadline_alertORDER BY serialized_dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and the data in the columns
should match the data as presented in the long json output from Step U-6.
serialized_dag_id|reference|interval|callback_def|created_at
duplicate_deadline_one_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.131234+00
duplicate_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.048992+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|30|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Early warning."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|40|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Last call."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
unique_deadline_one_of_two|{"datetime": 334720800.0, "reference_type": "FixedDatetimeDeadline"}|0|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Deadline in the past; Alert should trigger immediately!"}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.961688+00
unique_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|20|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Long deadline and a short task; Alert should not trigger."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.992653+00
(6 rows)
Step U-11: Verify foreign key relationships
############### STEP U-11 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tc.constraint_name, tc.table_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_nameFROM information_schema.table_constraints AS tcJOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schemaJOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schemaWHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name IN ('deadline_alert', 'deadline');"
############### STEP U-12 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, da.id as deadline_alert_id, COUNT(d.id) as linked_deadlinesFROM deadline_alert daJOIN serialized_dag sd ON da.serialized_dag_id = sd.idLEFT JOIN deadline d ON d.deadline_alert_id = da.idGROUP BY sd.dag_id, da.idORDER BY sd.dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and since we only executed one
Dag, only that one should have a linked deadline.
############### STEP U-15 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, uuid_val as uuid_in_serialized_dag, da.id::text as uuid_in_deadline_alert, da.intervalFROM serialized_dag sdCROSS JOIN LATERAL jsonb_array_elements_text(sd.data->'dag'->'deadline') AS uuid_valJOIN deadline_alert da ON da.id::text = uuid_valWHERE sd.dag_id = 'two_unique_deadlines'ORDER BY da.interval;"
Expected Output:
Two rows (matching the number of DeadlineAlerts in the dag named two_unique_deadlines). Each row should
show matching deadline_alert_ids in both the uuid_in_serialized_dag and uuid_in_deadline_alert columns.
Step U-16: Compare migration output with baseline data
Compare the migration console output counts with Step U-3 baseline counts
Verify: Processed X Dags matches total_dags from baseline
Verify: Y had DeadlineAlerts matches dags_with_deadlines from baseline
Verify: Migrated Z DeadlineAlert configurations matches total_deadline_alerts from baseline
Expected Results
✅ Successful Migration Indicators:
All baseline Dag counts match migration output
DeadlineAlert table contains expected number of records
Foreign key constraints are properly created and functional
No errors during migration process
❌ Failure Indicators:
Count mismatches between baseline and migration output
Missing DeadlineAlert records
Foreign key constraint errors
Migration errors or exceptions
Downgrade Testing
Phase 1: Pre-Downgrade Validation
Step D-0: Ensure you are starting from a fully upgraded state
If you are continuing from the Upgrade Test above, you are ready to go. If not, ensure the following
prerequisites are met before continuing:
Database should have the deadline_alert table populated
serialized_dag should contain UUID arrays in the deadline field
All upgrade tests (Steps 1-15) should have passed
[Optional] Step D-0.5: QoL Step
As above, I recommend you open a new terminal and run breeze exec so you have a fresh full-sized
terminal that is easier to copy/paste. Alternatively, if you want to stay in tmux you may find it
easier to zoom to just the one panel with CTRL-b z (which also undoes it later)
Step D-1: Verify current state has UUID arrays in serialized_dag
############## STEP D-1 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, jsonb_typeof(data->'dag'->'deadline') as deadline_type, jsonb_array_length(data->'dag'->'deadline') as deadline_count, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonbORDER BY dag_id;"
Expected Output:
All Dags with deadlines should show deadline_type as 'array' and contain UUID strings (not objects).
There should be 5 rows matching dags_with_deadlines from Step U-2 in the upgrade test.
No errors (some deprecation warnings may be printed depending on your environment and are acceptable)
The printed counts should be accurate and match the output of Step U-2 (7 dags, 5 have DeadlineAlerts, 5 total configurations)
Processing batch 2... should not be printed
The last line should say "Downgrade complete"
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:29:39.332589Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:29:39.399138Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:29:39.435089Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:29:39.435300Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:29:39.486819Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 5 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Restored 6 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Phase 3: Validate Downgrade Results
Step D-5: Verify deadline_alert table was dropped
############# PHASE 3 ######################### STEP D-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade.
tablename
-----------
(0 rows)
Step D-6: Verify data was restored to original format
############## STEP D-6 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as status, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array')ORDER BY dag_id;"
Expected Output:
This output should EXACTLY match the output saved from Step U-6. There should be 5 rows, matching the
dags_with_alerts from step D-2, with the full object structure (not UUID strings).
Step D-10: Verify callback structure was restored correctly
############### STEP D-10 ##############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, jsonb_pretty(data->'dag'->'deadline'->0->'__var'->'callback_def') as full_callback_structureFROM serialized_dagWHERE dag_id = 'unique_deadline_one_of_two';"
Expected Output:
One row showing the complete callback structure with data, version , and classname fields. Note
that the order they appear in is not important as long as they are all there.
dag_id|full_callback_structure
unique_deadline_one_of_two|{
"__data__": {
"path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier",
"kwargs": {
"text": "Deadline in the past; Alert should trigger immediately!"
}
},
"__version__": 0,
"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback"
}
(1 row)
Phase 4: Re-run Upgrade to Verify Idempotency
Step D-11: Run upgrade migration again
############# PHASE 4 ########################## STEP D-11 ##############
airflow db migrate
Expected Output:
The migration should run successfully again, producing the same results as the first upgrade.
DB: postgresql+psycopg2://postgres:***@postgres/airflow
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
2025-11-01T03:36:22.259966Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.260149Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.268030Z [info ] Migrating the Airflow database [airflow.utils.db] loc=db.py:1131
2025-11-01T03:36:22.280921Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.281204Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.331632Z [info ] Running upgrade b87d2135fa50 -> 55297ae24532, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting migration of 7 Dags in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Migrated 6 DeadlineAlert configurations.
No Dags encountered errors during migration.
2025-11-01T03:36:22.377244Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.377478Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
Database migrating done!
Step D-12: Verify deadline_alert_id arrays were recreated
############### STEP D-12 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, jsonb_typeof(data->'dag'->'deadline') as deadline_type, jsonb_array_length(data->'dag'->'deadline') as deadline_countFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonbORDER BY dag_id;"
Expected Output:
All 5 Dags should show array type with UUID strings (matching Step D-1 output).
✅ deadline_alert table was properly dropped and recreated
✅ All deadline columns were removed and re-added
✅ Re-upgrade produced identical results to first upgrade
✅ Multi-deadline Dag structure preserved through full cycle
✅ Callback structure with __classname, __version, data restored correctly
Expected Results
✅ Successful Downgrade Indicators:
Downgrade migration completes without errors
deadline_alert table is dropped
serialized_dag data exactly matches pre-upgrade format
All deadline alert objects have correct nested structure
Re-upgrade produces identical results to first upgrade
No data loss or corruption
❌ Failure Indicators:
Downgrade migration errors or exceptions
serialized_dag data doesn't match original format
Missing or incorrect __type, __var, __classname fields
Interval values changed (e.g., 1.0 became 1)
Multi-deadline arrays corrupted or missing elements
Re-upgrade produces different results
Appendix 1: Sample Dags
Ensure there are at least three Dags, one with a single DeadlineAlert, one with multiple DeadlineAlerts, and
one without. Below is the Dag file I used for the testing. The callback will fail without a Slack connection
configured, but that is enough for our testing purposes.
fromdatetimeimporttimedelta, datetimefromairflowimportDAGfromairflow.providers.slack.notifications.slack_webhookimportSlackWebhookNotifierfromairflow.providers.standard.operators.emptyimportEmptyOperatorfromairflow.sdkimporttaskfromairflow.sdk.definitions.deadlineimportDeadlineAlert, DeadlineReference, AsyncCallback@task.bash(task_id='sleep_task')defsleep_10_secs():
return'sleep 10'withDAG(dag_id='no_deadline_1'):
EmptyOperator(task_id='hello_world')
withDAG(dag_id='no_deadline_2'):
EmptyOperator(task_id='hello_world_again')
withDAG(
dag_id="unique_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
interval=timedelta(0),
callback=AsyncCallback(
SlackWebhookNotifier,
{"text": "Deadline in the past; Alert should trigger immediately!"},
)
)
):
sleep_10_secs()
withDAG(
dag_id="unique_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=20),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Long deadline and a short task; Alert should not trigger."},
)
)
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="two_unique_deadlines",
deadline=[
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=30),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Early warning."},
)
),
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=40),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Last call."},
)
),
]
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="duplicate_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()
withDAG(
dag_id="duplicate_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()
Step U-0: Start Airflow with your Dags and a fresh DB
[COMPRESSED ONLY]
Create a file files/airflow-breeze-config/init.sh with the contents
export AIRFLOW__CORE__COMPRESS_SERIALIZED_DAGS=True
[BOTH]
breeze start-airflow --db-reset
Expected Output:
You should be in the Breeze environment with all processes running.
[Optional] Step U-0.5: QoL Step
I recommend you open a new terminal and run breeze exec so you have a fresh full-sized terminal that is
easier to copy/paste. Alternatively, if you want to stay in tmux you may find it easier to zoom to just
the one panel with CTRL-b z (which also undoes it later)
Step U-1: Unpause and Trigger a Dag to populate the deadline table
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############## STEP U-2 #############
psql postgresql://postgres:airflow@postgres/airflow -c "WITH latest_dags AS ( SELECT DISTINCT ON (dag_id) CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN CASE WHEN jsonb_typeof(data->'dag'->'deadline') = 'array' THEN jsonb_array_length(data->'dag'->'deadline') ELSE 1 END ELSE 0 END as deadline_count FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC)SELECT COUNT(*) as total_dags, COUNT(*) FILTER (WHERE deadline_count > 0) as dags_with_deadlines, SUM(deadline_count) as total_deadline_alertsFROM latest_dags;"
[COMPRESSED ONLY] Step U-2.5B: Verify Compression is Working
################# STEP U-2.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL AND (data IS NULL OR data::text = 'null') THEN 'data_compressed' WHEN data IS NOT NULL AND data::text != 'null' THEN 'data' ELSE 'neither' END as storage_column, length(data_compressed) as compressed_size_bytesFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the number of Dags from Step U-2, all of which should show data_compressed in the storage_column.
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############## STEP U-3 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as deadline_status, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN CASE WHEN jsonb_typeof(data->'dag'->'deadline') = 'array' THEN jsonb_array_length(data->'dag'->'deadline') ELSE 1 END ELSE 0 END as deadline_countFROM ( SELECT DISTINCT ON (dag_id) dag_id, data FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC) latest_dagsORDER BY dag_id;"
[COMPRESSED ONLY]
############### STEP U-3B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute("SELECT dag_id, data, data_compressed FROM serialized_dag ORDER BY dag_id")print(" dag_id | deadline_status | deadline_count")print("-------------------------------+-----------------+----------------")row_count = 0for dag_id, data, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: status = 'HAS_DEADLINE' count = len(deadline_data) if isinstance(deadline_data, list) else 1 else: status = 'NO_DEADLINE' count = 0 print(f" {dag_id:29} | {status:15} | {count:14}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
7 rows, matching total_dags from U-2, all Dags are listed, and the correct number of deadlines are
indicated for each:
Note 2: The exact output here will depend on if you have run the tests previously. For this one command,
and only this command, you may see several warnings that some Dags have "non-string deadline values",
and you may or may not also see the statistics reported, that is fine this once.
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
2025-10-31T21:49:17.784487Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-10-31T21:49:17.839842Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-10-31T21:49:17.870823Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-10-31T21:49:17.870997Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-10-31T21:49:17.912714Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Downgrade complete
Alternative acceptable output will look something like this:
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:58:56.202545Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:58:56.282798Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:58:56.325404Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:58:56.325597Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:58:56.371662Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 5 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
WARNING: Dag duplicate_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag duplicate_deadline_two_of_two has non-string deadline values, skipping
WARNING: Dag two_unique_deadlines has non-string deadline values, skipping
WARNING: Dag unique_deadline_one_of_two has non-string deadline values, skipping
WARNING: Dag unique_deadline_two_of_two has non-string deadline values, skipping
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Restored 7 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Step U-5: Verify downgrade worked
############## STEP U-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade:
tablename
-----------
(0 rows)
Step U-6: Validate data is intact after downgrade
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############## STEP U-6 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as status, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM ( SELECT DISTINCT ON (dag_id) dag_id, data FROM serialized_dag WHERE data IS NOT NULL ORDER BY dag_id, last_updated DESC) latest_dagsWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array')ORDER BY dag_id;"
[COMPRESSED ONLY]
############### STEP U-6B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id""")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: if row_count == 0: print(f"dag_id|status|deadline_data") print(f"{dag_id}|HAS_DEADLINE|{json.dumps(deadline_data, indent=2)}") print() row_count += 1print(f"({row_count} row{'s' if row_count != 1 else ''})")cur.close()conn.close()EOF
Expected Output For Both:
Save this output somewhere. This output is pretty long; most important for now is that the final line
should say 5 rows, matching dags_with_deadlines from Step U-2. The exact json deadline data will be used
later to validate that the data has been migrated without modification, and to validate the downgrade.
[COMPRESSED ONLY] Step 8.5B: Verify Migration Worked with Compressed Data
################# STEP U-8.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL THEN 'compressed' WHEN data IS NOT NULL THEN 'uncompressed' END as storage_typeFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the total_dags from Step U-2, all showing "compressed".
############### STEP U-10 ##############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT serialized_dag_id, reference, interval, callback_def, created_atFROM deadline_alertORDER BY serialized_dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and the data in the columns
should match the data as presented in the long json output from Step U-6.
serialized_dag_id|reference|interval|callback_def|created_at
duplicate_deadline_one_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.131234+00
duplicate_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|1|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.048992+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|30|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Early warning."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
two_unique_deadlines|{"reference_type": "DagRunQueuedAtDeadline"}|40|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Last call."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:53.07874+00
unique_deadline_one_of_two|{"datetime": 334720800.0, "reference_type": "FixedDatetimeDeadline"}|0|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Deadline in the past; Alert should trigger immediately!"}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.961688+00
unique_deadline_two_of_two|{"reference_type": "DagRunQueuedAtDeadline"}|20|{"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", "__data__": {"kwargs": {"text": "Long deadline and a short task; Alert should not trigger."}, "path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier"}, "__version__": 0}|2025-11-01 03:50:52.992653+00
(6 rows)
Step U-11: Verify foreign key relationships
############### STEP U-11 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tc.constraint_name, tc.table_name, kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_nameFROM information_schema.table_constraints AS tcJOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schemaJOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schemaWHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_name IN ('deadline_alert', 'deadline');"
############### STEP U-12 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, da.id as deadline_alert_id, COUNT(d.id) as linked_deadlinesFROM deadline_alert daJOIN serialized_dag sd ON da.serialized_dag_id = sd.idLEFT JOIN deadline d ON d.deadline_alert_id = da.idGROUP BY sd.dag_id, da.idORDER BY sd.dag_id;"
Expected Output:
There should be 6 rows, matching total_deadline_alerts from Step U-2, and since we only executed one
Dag, only that one should have a linked deadline.
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############### STEP U-15 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, uuid_val as uuid_in_serialized_dag, da.id::text as uuid_in_deadline_alert, da.intervalFROM serialized_dag sdCROSS JOIN LATERAL jsonb_array_elements_text(sd.data->'dag'->'deadline') AS uuid_valJOIN deadline_alert da ON da.id::text = uuid_valWHERE sd.dag_id = 'two_unique_deadlines'ORDER BY da.interval;"
[COMPRESSED ONLY]
################ STEP U-15B ###############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT sd.dag_id, sd.data_compressed, da.id::text, da.interval FROM serialized_dag sd JOIN deadline_alert da ON da.serialized_dag_id = sd.id WHERE sd.dag_id = 'two_unique_deadlines' ORDER BY da.interval""")print(" dag_id | uuid_in_serialized_dag | uuid_in_deadline_alert | interval")print("----------------------+--------------------------------------+--------------------------------------+----------")row_count = 0for dag_id, data_compressed, alert_id, interval in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_uuids = dag_data['dag']['deadline'] # Check if this alert_id is in the deadline array if alert_id in deadline_uuids: print(f" {dag_id:20} | {alert_id:36} | {alert_id:36} | {interval:8}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
Two rows (matching the number of DeadlineAlerts in the dag named two_unique_deadlines). Each row should
show matching deadline_alert_ids in both the uuid_in_serialized_dag and uuid_in_deadline_alert columns.
################## STEP U-15.5B #################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT sd.dag_id, length(sd.data_compressed) as compressed_bytes, COUNT(da.id) as deadline_countFROM serialized_dag sdLEFT JOIN deadline_alert da ON da.serialized_dag_id = sd.idWHERE sd.data_compressed IS NOT NULL AND sd.dag_id LIKE '%deadline%'GROUP BY sd.dag_id, sd.data_compressedORDER BY sd.dag_id;"
Expected Output:
7 rows, matching the total_dags from Step U-2. Each listed Dag will have the correct number of deadlines
matching Step U-3.
If you plan on continuing to the compressed downgrade testing, no action is needed.
Otherwise, you likely want to either remove the file created in step U-0 or comment
out the line by adding a # before it.
Phase 4: Validation Summary
Step U-16: Compare migration output with baseline data
Compare the migration console output counts with Step U-3 baseline counts
Verify: Processed X Dags matches total_dags from baseline
Verify: Y had DeadlineAlerts matches dags_with_deadlines from baseline
Verify: Migrated Z DeadlineAlert configurations matches total_deadline_alerts from baseline
Expected Results
✅ Successful Migration Indicators:
All baseline Dag counts match migration output
DeadlineAlert table contains expected number of records
Foreign key constraints are properly created and functional
No errors during migration process
❌ Failure Indicators:
Count mismatches between baseline and migration output
Missing DeadlineAlert records
Foreign key constraint errors
Migration errors or exceptions
Downgrade Testing
Phase 1: Pre-Downgrade Validation
Step D-0: Ensure you are starting from a fully upgraded state
If you are continuing from the Upgrade Test above, you are ready to go. If not, ensure the following
prerequisites are met before continuing:
Database should have the deadline_alert table populated
serialized_dag should contain UUID arrays in the deadline field
All upgrade tests (Steps 1-15) should have passed
If testing compressed data, ensure that files/airflow-breeze-config/init.sh exists from step U-0
[Optional] Step D-0.5: QoL Step
As above, I recommend you open a new terminal and run breeze exec so you have a fresh full-sized
terminal that is easier to copy/paste. Alternatively, if you want to stay in tmux you may find it
easier to zoom to just the one panel with CTRL-b z (which also undoes it later)
Step D-1: Verify current state has UUID arrays in serialized_dag
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############## STEP D-1 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, jsonb_typeof(data->'dag'->'deadline') as deadline_type, jsonb_array_length(data->'dag'->'deadline') as deadline_count, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonbORDER BY dag_id;"
[COMPRESSED ONLY]
############### STEP D-1B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute("SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id")print("dag_id|deadline_type|deadline_count|deadline_data")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: deadline_type = 'array' if isinstance(deadline_data, list) else 'object' deadline_count = len(deadline_data) if isinstance(deadline_data, list) else 1 print(f"{dag_id}|{deadline_type}|{deadline_count}|{json.dumps(deadline_data, indent=4)}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
All Dags with deadlines should show deadline_type as 'array' and contain UUID strings (not objects).
There should be 5 rows matching dags_with_deadlines from Step U-2 in the upgrade test.
[COMPRESSED ONLY] Step D-2.5B: Verify Data is Still Compressed Before Downgrade
################# STEP D-2.5B ################
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, CASE WHEN data_compressed IS NOT NULL THEN 'compressed' WHEN data IS NOT NULL THEN 'uncompressed' END as storage_typeFROM serialized_dagORDER BY dag_id;"
Expected Output:
7 rows, matching the total_dags from step D-2, each showing "compressed". Output should be identical to
step U-8.5B
No errors (some deprecation warnings may be printed depending on your environment and are acceptable)
The printed counts should be accurate and match the output of Step U-2 (7 dags, 5 have DeadlineAlerts, 5 total configurations)
Processing batch 2... should not be printed
The last line should say "Downgrade complete"
Performing downgrade with database postgresql+psycopg2://postgres:***@postgres/airflow
Warning: About to reverse schema migrations for the airflow metastore. Please ensure you have backed up your database before any upgrade or downgrade operation. Proceed? (y/n)
y
2025-11-01T03:29:39.332589Z [info ] Attempting downgrade to revision b87d2135fa50 [airflow.utils.db] loc=db.py:1205
2025-11-01T03:29:39.399138Z [info ] Applying downgrade migrations to Airflow database. [airflow.utils.db] loc=db.py:1218
2025-11-01T03:29:39.435089Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:29:39.435300Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:29:39.486819Z [info ] Running downgrade 55297ae24532 -> b87d2135fa50, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting downgrade of 5 Dags with DeadlineAlerts in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Restored 6 DeadlineAlert configurations to original format.
No Dags encountered errors during downgrade.
Downgrade complete
Phase 3: Validate Downgrade Results
Step D-5: Verify deadline_alert table was dropped
############# PHASE 3 ######################### STEP D-5 #############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT tablename FROM pg_tables WHERE tablename = 'deadline_alert';"
Expected Output:
The deadline_alert table should not exist after the downgrade.
tablename
-----------
(0 rows)
Step D-6: Verify data was restored to original format
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############## STEP D-6 #############
psql postgresql://postgres:airflow@postgres/airflow -A -c "SELECT dag_id, CASE WHEN data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array') THEN 'HAS_DEADLINE' ELSE 'NO_DEADLINE' END as status, jsonb_pretty(data->'dag'->'deadline') as deadline_dataFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonb AND jsonb_typeof(data->'dag'->'deadline') IN ('object', 'array')ORDER BY dag_id;"
[COMPRESSED ONLY]
############### STEP D-6B ##############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute(""" SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id""")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: if row_count == 0: print(f"dag_id|status|deadline_data") print(f"{dag_id}|HAS_DEADLINE|{json.dumps(deadline_data, indent=2)}") print() row_count += 1print(f"({row_count} row{'s' if row_count != 1 else ''})")cur.close()conn.close()EOF
Expected Output For Both:
This output should EXACTLY match the output saved from Step U-6. There should be 5 rows, matching the
dags_with_alerts from step D-2, with the full object structure (not UUID strings).
One row showing the complete callback structure with data, version , and classname fields. Note
that the order they appear in is not important as long as they are all there.
dag_id|full_callback_structure
unique_deadline_one_of_two|{
"__data__": {
"path": "airflow.providers.slack.notifications.slack_webhook.SlackWebhookNotifier",
"kwargs": {
"text": "Deadline in the past; Alert should trigger immediately!"
}
},
"__version__": 0,
"__classname__": "airflow.sdk.definitions.deadline.AsyncCallback"
}
(1 row)
Phase 4: Re-run Upgrade to Verify Idempotency
Step D-11: Run upgrade migration again
############# PHASE 4 ########################## STEP D-11 ##############
airflow db migrate
Expected Output:
The migration should run successfully again, producing the same results as the first upgrade.
DB: postgresql+psycopg2://postgres:***@postgres/airflow
Performing upgrade to the metadata database postgresql+psycopg2://postgres:***@postgres/airflow
2025-11-01T03:36:22.259966Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.260149Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.268030Z [info ] Migrating the Airflow database [airflow.utils.db] loc=db.py:1131
2025-11-01T03:36:22.280921Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.281204Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
2025-11-01T03:36:22.331632Z [info ] Running upgrade b87d2135fa50 -> 55297ae24532, Add required fields to enable UI integrations for the Deadline Alerts feature. [alembic.runtime.migration] loc=migration.py:622
Using migration_batch_size of 10000 as set in Airflow configuration.
Starting migration of 7 Dags in 1 batches.
Processing batch 1...
Batch 1 of 1 complete.
Processed 7 serialized_dag records (7 unique Dags), 5 had DeadlineAlerts.
Migrated 6 DeadlineAlert configurations.
No Dags encountered errors during migration.
2025-11-01T03:36:22.377244Z [info ] Context impl PostgresqlImpl. [alembic.runtime.migration] loc=migration.py:211
2025-11-01T03:36:22.377478Z [info ] Will assume transactional DDL. [alembic.runtime.migration] loc=migration.py:214
Database migrating done!
Step D-12: Verify deadline_alert_id arrays were recreated
NOTE: The command is different but the expected output is identical.
[UNCOMPRESSED ONLY]
############### STEP D-12 ##############
psql postgresql://postgres:airflow@postgres/airflow -c "SELECT dag_id, jsonb_typeof(data->'dag'->'deadline') as deadline_type, jsonb_array_length(data->'dag'->'deadline') as deadline_countFROM serialized_dagWHERE data->'dag'->'deadline' IS NOT NULL AND data->'dag'->'deadline' != 'null'::jsonbORDER BY dag_id;"
[COMPRESSED ONLY]
################ STEP D-12B ###############
python3 << 'EOF'import jsonimport zlibimport psycopg2conn = psycopg2.connect("postgresql://postgres:airflow@postgres/airflow")cur = conn.cursor()cur.execute("SELECT dag_id, data_compressed FROM serialized_dag ORDER BY dag_id")print(" dag_id | deadline_type | deadline_count")print("-------------------------------+---------------+----------------")row_count = 0for dag_id, data_compressed in cur.fetchall(): dag_data = json.loads(zlib.decompress(bytes(data_compressed))) deadline_data = dag_data['dag']['deadline'] if deadline_data: deadline_type = 'array' if isinstance(deadline_data, list) else 'object' deadline_count = len(deadline_data) if isinstance(deadline_data, list) else 1 print(f" {dag_id:29} | {deadline_type:13} | {deadline_count:14}") row_count += 1print(f"({row_count} rows)")cur.close()conn.close()EOF
Expected Output:
All 5 Dags should show array type with UUID strings (matching Step D-1 output).
✅ deadline_alert table was properly dropped and recreated
✅ All deadline columns were removed and re-added
✅ Re-upgrade produced identical results to first upgrade
✅ Multi-deadline Dag structure preserved through full cycle
✅ Callback structure with __classname, __version, data restored correctly
Expected Results
✅ Successful Downgrade Indicators:
Downgrade migration completes without errors
deadline_alert table is dropped
serialized_dag data exactly matches pre-upgrade format
All deadline alert objects have correct nested structure
Re-upgrade produces identical results to first upgrade
No data loss or corruption
❌ Failure Indicators:
Downgrade migration errors or exceptions
serialized_dag data doesn't match original format
Missing or incorrect __type, __var, __classname fields
Interval values changed (e.g., 1.0 became 1)
Multi-deadline arrays corrupted or missing elements
Re-upgrade produces different results
Appendix 1: Sample Dags
Ensure there are at least three Dags, one with a single DeadlineAlert, one with multiple DeadlineAlerts, and
one without. Below is the Dag file I used for the testing. The callback will fail without a Slack connection
configured, but that is enough for our testing purposes.
fromdatetimeimporttimedelta, datetimefromairflowimportDAGfromairflow.providers.slack.notifications.slack_webhookimportSlackWebhookNotifierfromairflow.providers.standard.operators.emptyimportEmptyOperatorfromairflow.sdkimporttaskfromairflow.sdk.definitions.deadlineimportDeadlineAlert, DeadlineReference, AsyncCallback@task.bash(task_id='sleep_task')defsleep_10_secs():
return'sleep 10'withDAG(dag_id='no_deadline_1'):
EmptyOperator(task_id='hello_world')
withDAG(dag_id='no_deadline_2'):
EmptyOperator(task_id='hello_world_again')
withDAG(
dag_id="unique_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(datetime(1980, 8, 10, 2)),
interval=timedelta(0),
callback=AsyncCallback(
SlackWebhookNotifier,
{"text": "Deadline in the past; Alert should trigger immediately!"},
)
)
):
sleep_10_secs()
withDAG(
dag_id="unique_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=20),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Long deadline and a short task; Alert should not trigger."},
)
)
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="two_unique_deadlines",
deadline=[
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=30),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Early warning."},
)
),
DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=40),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Last call."},
)
),
]
):
EmptyOperator(task_id='empty_task')
withDAG(
dag_id="duplicate_deadline_one_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()
withDAG(
dag_id="duplicate_deadline_two_of_two",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=1),
callback=AsyncCallback(
callback_callable=SlackWebhookNotifier,
kwargs={"text": "Short deadline with long task; Alert should trigger! {{ dag_run.dag_id }} "},
)
)
):
sleep_10_secs()