Last active
September 8, 2021 20:22
-
-
Save PaoloLeonard/104494d1a0c01ad7891fd934c684c447 to your computer and use it in GitHub Desktop.
Table metric implementation for the GE table expectation tutorial.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, Tuple, Any | |
from great_expectations.core.batch_spec import PathBatchSpec | |
from great_expectations.execution_engine import ( | |
SparkDFExecutionEngine, | |
PandasExecutionEngine | |
) | |
from great_expectations.expectations.metrics.metric_provider import metric_value | |
from great_expectations.expectations.metrics.table_metric_provider import ( | |
TableMetricProvider, | |
) | |
class OtherTableRowCount(TableMetricProvider): | |
"""MetricProvider class to get row count from different tables than the current one.""" | |
metric_name = "table.row_count_other" | |
@metric_value(engine=PandasExecutionEngine) | |
def _pandas( | |
cls, | |
execution_engine: "PandasExecutionEngine", | |
metric_domain_kwargs: Dict, | |
metric_value_kwargs: Dict, | |
metrics: Dict[Tuple, Any], | |
runtime_configuration: Dict, | |
) -> int: | |
other_table_filename = metric_domain_kwargs.get("table_filename") | |
batch_spec = PathBatchSpec( | |
{"path": other_table_filename, "reader_method": "read_csv"} | |
) | |
batch_data = execution_engine.get_batch_data(batch_spec=batch_spec) | |
df = batch_data.dataframe | |
return df.shape[0] | |
@metric_value(engine=SparkDFExecutionEngine) | |
def _spark( | |
cls, | |
execution_engine: "SparkDFExecutionEngine", | |
metric_domain_kwargs: Dict, | |
metric_value_kwargs: Dict, | |
metrics: Dict[Tuple, Any], | |
runtime_configuration: Dict, | |
) -> int: | |
other_table_filename = metric_domain_kwargs.get("table_filename") | |
batch_spec = PathBatchSpec( | |
{"path": other_table_filename, "reader_method": "csv"} | |
) | |
batch_data = execution_engine.get_batch_data(batch_spec=batch_spec) | |
df = batch_data.dataframe | |
return df.count() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment