Created
July 23, 2019 13:55
-
-
Save BasPH/750fe3d7e63bd15392b74282fe04c085 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Test the validity of all DAGs.""" | |
import glob | |
from os import path | |
import pytest | |
from airflow import models as airflow_models | |
DAG_PATHS = glob.glob(path.join(path.dirname(__file__), "..", "..", "dags", "*.py")) | |
@pytest.mark.parametrize("dag_path", DAG_PATHS) | |
def test_dag_integrity(dag_path): | |
"""Import DAG files and check for a valid DAG instance.""" | |
dag_name = path.basename(dag_path) | |
module = _import_file(dag_name, dag_path) | |
# Validate if there is at least 1 DAG object in the file | |
assert any(isinstance(var, airflow_models.DAG) for var in vars(module).values()) | |
# For every DAG object, test for cycles | |
for dag in [var for var in vars(module).values() if isinstance(var, airflow_models.DAG)]: | |
dag.test_cycle() | |
def _import_file(module_name, module_path): | |
import importlib.util | |
spec = importlib.util.spec_from_file_location(module_name, str(module_path)) | |
module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(module) | |
return module |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment