Sqlite3
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased
The source code for this tutorial can be found here {octicon}mark-github
.
import pandas
from flytekit import kwtypes, task, workflow
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
SQLite3 queries in flyte produce a Schema output. The data in this example is taken from here.
from flytekit.types.schema import FlyteSchema
EXAMPLE_DB = "https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"
the task is declared as a regular task. Alternatively it can be declared within a workflow just at the point of using it (example later)
sql_task = SQLite3Task(
name="sqlite3.sample",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)
As described elsewhere FlyteSchemas can be easily be received as pandas Dataframe and Flyte will autoconvert them
@task
def print_and_count_columns(df: pandas.DataFrame) -> int:
return len(df[df.columns[0]])
The task can be used normally in the workflow, passing the declared inputs
@workflow
def wf() -> int:
return print_and_count_columns(df=sql_task(limit=100))
It can also be executed locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running main {wf()}")
As mentioned earlier it is possible to also write the SQL Task inline as follows
@workflow
def query_wf() -> int:
df = SQLite3Task(
name="sqlite3.sample_inline",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)(limit=100)
return print_and_count_columns(df=df)
It can also be executed locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running main {query_wf()}")