Last active
March 31, 2023 19:15
-
-
Save klesouza/61267b9a38effe0f5baea894393c98e6 to your computer and use it in GitHub Desktop.
Example Dagster pipeline running on Azure Databricks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import pipeline, solid, repository, execute_pipeline | |
from dagster.core.definitions.mode import ModeDefinition | |
from dagster_databricks import databricks_pyspark_step_launcher | |
from pathlib import Path | |
from dagster_pyspark import pyspark_resource | |
from dagster_azure.adls2.io_manager import adls2_pickle_io_manager | |
from dagster_azure.adls2 import adls2_resource | |
from dagster import pipeline, solid, repository, execute_pipeline | |
from dagster.core.definitions.mode import ModeDefinition | |
from dagster_databricks import databricks_pyspark_step_launcher | |
from pathlib import Path | |
from dagster_pyspark import pyspark_resource | |
from dagster_azure.adls2.io_manager import adls2_pickle_io_manager | |
from dagster_azure.adls2 import adls2_resource | |
databricks_mode = ModeDefinition( | |
name="databricks", | |
resource_defs={ | |
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured( | |
{ | |
"run_config": { | |
"run_name": "test dagster", | |
"cluster": {"existing": "[cluster-id]"}, # from the Databricks URL | |
"libraries": [ | |
{"pypi": {"package": "dagster-azure==0.12.1"}}, | |
{"pypi": {"package": "dagster-databricks==0.12.1"}}, | |
], | |
}, | |
"databricks_host": "https://[cluster url].azuredatabricks.net/", | |
"databricks_token": { | |
"env": "DB_TOKEN" # It has to be available in the Databricks env as well | |
}, | |
"local_pipeline_package_path": str(Path(__file__).parent.parent), | |
"secrets_to_env_variables": [ | |
{ # ENV VAR created in the DAtabricks run time with values from the secret | |
"name": "DUMMY_KEY", # it has to match the the env keys you used in this file | |
"key": "[same as storage_account_key_key]", | |
"scope": "[same as secret_scope]", | |
}, | |
{ # ENV VAR created in the DAtabricks run time with values from the secret | |
"name": "DB_TOKEN", # it has to match the the env keys you used in this file | |
"key": "[key for the DATABRICKS_TOKEN in KeyVault]", | |
"scope": "[secret_scope for the key]", | |
}, | |
], | |
"storage": { | |
"adls2": { | |
"storage_account_name": "[storage account name]", | |
"storage_account_key_key": "[KeyVault keyname where Primary/Secondary Azure storage key is stored]", # you can find with databricks CLI `databricks secrets list --scope [scope name]` | |
"secret_scope": "[secret scope]", | |
} | |
}, | |
} | |
), | |
"pyspark": pyspark_resource.configured( | |
{ | |
"spark_conf": { | |
"spark.executor.memory": "2g", | |
"fs.azure.account.key.[storage account name].dfs.core.windows.net": { | |
"env": "DUMMY_KEY" | |
}, # Primary/Secondary Azure storage key | |
} | |
} | |
), | |
"adls2": adls2_resource.configured( | |
{ | |
"credential": {"key": {"env": "DUMMY_KEY"}}, # Primary/Secondary Azure storage key | |
"storage_account": "[storage account name]", | |
} | |
), | |
"io_manager": adls2_pickle_io_manager.configured( | |
{"adls2_file_system": "[Blob container name]", "adls2_prefix": "[Blob path]"} | |
), | |
}, | |
) | |
@solid(required_resource_keys={"pyspark_step_launcher", "pyspark"}) | |
def test(context): | |
context.resources.pyspark.spark_session.createDataFrame([(1, 2), (2, 1)], 'a: int, b: int') | |
@pipeline(mode_defs=[databricks_mode]) | |
def my_pipeline(): | |
test() | |
@repository | |
def databricks_example(): | |
return [my_pipeline] | |
if __name__ == "__main__": | |
execute_pipeline(my_pipeline) | |
databricks_mode = ModeDefinition( | |
name="databricks", | |
resource_defs={ | |
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured( | |
{ | |
"run_config": { | |
"run_name": "test dagster", | |
"cluster": { | |
"existing": "[cluster-id]" # from the Databricks URL | |
}, | |
"libraries": [ | |
{ "pypi": { "package": "dagster-azure==0.12.1" } }, | |
{ "pypi": { "package": "dagster-databricks==0.12.1" } } | |
] | |
}, | |
"databricks_host": "https://[cluster url].azuredatabricks.net/", | |
"databricks_token": { | |
"env": "DB_TOKEN" # It has to be available in the Databricks env as well | |
}, | |
"local_pipeline_package_path": str(Path(__file__).parent.parent), | |
"secrets_to_env_variables": [{ # ENV VAR created in the DAtabricks run time with values from the secret | |
"name": "DUMMY_KEY", # it has to match the the env keys you used in this file | |
"key": "[same as storage_account_key_key]", | |
"scope": "[same as secret_scope]" | |
},{ # ENV VAR created in the DAtabricks run time with values from the secret | |
"name": "DB_TOKEN", # it has to match the the env keys you used in this file | |
"key": "[key for the DATABRICKS_TOKEN in KeyVault]", | |
"scope": "[secret_scope for the key]" | |
}], | |
"storage": { | |
"adls2": { | |
"storage_account_name": "[storage account name]", | |
"storage_account_key_key": "[KeyVault keyname where Primary/Secondary Azure storage key is stored]", # you can find with databricks CLI `databricks secrets list --scope [scope name]` | |
"secret_scope": "[secret scope]" | |
} | |
} | |
} | |
), | |
"pyspark": pyspark_resource.configured( | |
{"spark_conf": { | |
"spark.executor.memory": "2g", | |
"fs.azure.account.key.[storage account name].dfs.core.windows.net": {"env": "DUMMY_KEY"}, # Primary/Secondary Azure storage key | |
}}), | |
"adls2": adls2_resource.configured( | |
{ | |
"credential": { | |
"key": { "env": "DUMMY_KEY"} # Primary/Secondary Azure storage key | |
}, | |
"storage_account": "[storage account name]" | |
} | |
), | |
"io_manager": adls2_pickle_io_manager.configured( | |
{ | |
"adls2_file_system": "[Blob container name]", | |
"adls2_prefix": "[Blob path]" | |
} | |
) | |
} | |
) | |
@solid(required_resource_keys={"pyspark_step_launcher", "pyspark"}) | |
def test(context): | |
context.resources.pyspark.spark_session.createDataFrame([ | |
(1, 2), | |
(2, 1) | |
], 'a: int, b: int') | |
@pipeline(mode_defs=[databricks_mode]) | |
def my_pipeline(): | |
test() | |
@repository | |
def databricks_example(): | |
return [my_pipeline] | |
if __name__ == "__main__": | |
execute_pipeline(my_pipeline) | |
# Run using: env DAGSTER_HOME=$PWD DUMMY_KEY=something DB_TOKEN=something dagster pipeline execute -f dagster_databricks.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment