Created
April 4, 2023 20:29
-
-
Save a-chumagin/b9dd5fa2e5b56eae7353c8944e1de202 to your computer and use it in GitHub Desktop.
GX: checkpoint in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import great_expectations as gx | |
from great_expectations.data_context.types.base import DataContextConfig, FilesystemStoreBackendDefaults | |
import os | |
from great_expectations.core.batch import RuntimeBatchRequest | |
import pytest | |
@pytest.fixture | |
def create_context(): | |
context_dir = os.path.abspath("./tripdata") | |
data_context_config = DataContextConfig( | |
store_backend_defaults=FilesystemStoreBackendDefaults( | |
root_directory=context_dir)) | |
context = gx.get_context(project_config=data_context_config) | |
context.add_datasource(**datasource_config()) | |
save_config_to_file(context) | |
checkpoint_config = create_checkpoint_config() | |
context.add_or_update_checkpoint(**checkpoint_config) | |
return context | |
def save_config_to_file(context): | |
with open("./tripdata/great_expectations.yml", 'w') as outfile: | |
context.config.to_yaml(outfile) | |
def create_checkpoint_config(): | |
my_checkpoint_name = "my_checkpoint" | |
checkpoint_config = { | |
"name": my_checkpoint_name, | |
"config_version": 1, | |
"class_name": "SimpleCheckpoint", | |
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template", | |
} | |
return checkpoint_config | |
def datasource_config(): | |
datasource_config = { | |
"name": "taxi_datasource", | |
"class_name": "Datasource", | |
"module_name": "great_expectations.datasource", | |
"execution_engine": { | |
"module_name": "great_expectations.execution_engine", | |
"class_name": "PandasExecutionEngine", | |
}, | |
"data_connectors": { | |
"default_runtime_data_connector_name": { | |
"class_name": "RuntimeDataConnector", | |
"module_name": "great_expectations.datasource.data_connector", | |
"batch_identifiers": ["default_identifier_name"], | |
}, | |
"default_inferred_data_connector_name": { | |
"class_name": "InferredAssetFilesystemDataConnector", | |
"base_directory": "./data/", | |
"default_regex": {"group_names": ["data_asset_name"], "pattern": "(.*)"}, | |
}, | |
}, | |
} | |
return datasource_config | |
@pytest.fixture | |
def yellow_taxi_batch_request(): | |
yellow_taxi_batch_request = RuntimeBatchRequest( | |
datasource_name="taxi_datasource", | |
data_connector_name="default_runtime_data_connector_name", | |
data_asset_name="yellow_taxi", | |
runtime_parameters={"path": "./data/yellow_tripdata_2022-01.parquet"}, | |
batch_identifiers={"default_identifier_name": "yellow_taxi"} | |
) | |
return yellow_taxi_batch_request | |
@pytest.fixture | |
def green_taxi_batch_request(): | |
batch_request = RuntimeBatchRequest( | |
datasource_name="taxi_datasource", | |
data_connector_name="default_runtime_data_connector_name", | |
data_asset_name="green_taxi", | |
runtime_parameters={"path": "./data/green_tripdata_2022-01.parquet"}, | |
batch_identifiers={"default_identifier_name": "green_taxi"} | |
) | |
return batch_request | |
@pytest.mark.parametrize("expectation_suite_name, taxi_type", [ | |
("integrity", "green"), | |
("completeness", "green"), | |
("integrity", "yellow"), | |
("completeness", "yellow")]) | |
def test_tax_data_set(create_context, expectation_suite_name, taxi_type, request): | |
batch_request_fixture_name = f"{taxi_type}_taxi_batch_request" | |
taxi_batch = request.getfixturevalue(batch_request_fixture_name) | |
validation = {"batch_request": taxi_batch, | |
"expectation_suite_name": expectation_suite_name} | |
result = create_context.run_checkpoint(checkpoint_name="my_checkpoint", validations=[validation]) | |
assert result["success"] is True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment