Great Expectations
Name: | flytekitplugins-great_expectations |
Version: | 0.0.0+develop |
Author: | admin@flyte.org |
Provides: |
flytekitplugins.great_expectations |
Requires: |
flytekit>=1.5.0 great-expectations>=0.13.30,<1.0.0 sqlalchemy>=1.4.23 pyspark==3.3.2 s3fs<2023.6.0 |
Python: | >=3.9 |
License: | apache2 |
Source Code: | https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-greatexpectations |
- Intended Audience :: Science/Research
- Intended Audience :: Developers
- License :: OSI Approved :: Apache Software License
- Programming Language :: Python :: 3.9
- Programming Language :: Python :: 3.10
- Topic :: Scientific/Engineering
- Topic :: Scientific/Engineering :: Artificial Intelligence
- Topic :: Software Development
- Topic :: Software Development :: Libraries
- Topic :: Software Development :: Libraries :: Python Modules
Great Expectations helps enforce data quality. The plugin supports the usage of Great Expectations as task and type.
To install the plugin, run the following command:
pip install flytekitplugins-great-expectations
Task Example
import os
import pandas as pd
from flytekit import Resources, kwtypes, task, workflow
from flytekitplugins.great_expectations import BatchRequestConfig, GreatExpectationsTask
simple_task_object = GreatExpectationsTask(
name="great_expectations_task_simple",
datasource_name="data",
inputs=kwtypes(dataset=str),
expectation_suite_name="test.demo",
data_connector_name="data_example_data_connector",
context_root_dir="great_expectations",
)
@task(limits=Resources(mem="500Mi"))
def simple_task(csv_file: str) -> int:
result = simple_task_object(dataset=csv_file)
df = pd.read_csv(os.path.join("greatexpectations", "data", csv_file))
return df.shape[0]
@workflow
def simple_wf(dataset: str = "yellow_tripdata_sample_2019-01.csv") -> int:
return simple_task(csv_file=dataset)
Type Example
from flytekit import workflow
from flytekitplugins.great_expectations import (
BatchRequestConfig,
GreatExpectationsFlyteConfig,
GreatExpectationsType,
)
def simple_task(
directory: GreatExpectationsType[
str,
GreatExpectationsFlyteConfig(
datasource_name="data",
expectation_suite_name="test.demo",
data_connector_name="my_data_connector",
batch_request_config=BatchRequestConfig(
data_connector_query={
"batch_filter_parameters": {
"year": "2019",
"month": "01",
},
"limit": 10,
},
),
context_root_dir="great_expectations",
),
]
) -> str:
return f"Validation works for {directory}!"
@workflow
def simple_wf(directory: str = "my_assets") -> str:
return simple_task(directory=directory)
More examples can be found in the documentation.