Running a Spark Task
Once you have a Union account, install union
:
pip install union
Export the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"
Then run the following commands to run the workflow:
git clone https://github.com/unionai/unionai-examples
cd unionai-examples
union run --remote tutorials/sentiment_classifier/sentiment_classifier.py main --model distilbert-base-uncased
The source code for this tutorial can be found here {octicon}mark-github
.
import datetime
import random
from operator import add
import flytekit
from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.spark import Spark
Create an ImageSpec
to automate the retrieval of a prebuilt Spark image.
custom_image = ImageSpec(python_version="3.9", registry="ghcr.io/flyteorg", packages=["flytekitplugins-spark"])
Replace ghcr.io/flyteorg
with a container registry you’ve access to publish to.
To upload the image to the local registry in the demo cluster, indicate the registry as localhost:30000
.
To create a Spark task, add flytekitplugins.spark.Spark
config to the Flyte task.
The spark_conf
parameter can encompass configuration choices commonly employed when setting up a Spark cluster.
Additionally, if necessary, you can provide hadoop_conf
as an input.
@task(
task_config=Spark(
# This configuration is applied to the Spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
"spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
}
),
limits=Resources(mem="2000M"),
container_image=custom_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Spark with Partitions: {}".format(partitions))
n = 1 * partitions
sess = flytekit.current_context().spark_session
count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
pi_val = 4.0 * count / n
return pi_val
The hello_spark
task initiates a new Spark cluster.
When executed locally, it sets up a single-node client-only cluster.
However, when executed remotely, it dynamically scales the cluster size based on the specified Spark configuration.
For this particular example,
we define a function upon which the map-reduce operation is invoked within the Spark cluster.
def f(_):
x = random.random() * 2 - 1
y = random.random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0
Additionally, we specify a standard Flyte task that won’t be executed on the Spark cluster.
@task(
cache_version="2",
container_image=custom_image,
)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
print("My printed value: {} @ {}".format(value_to_print, date_triggered))
return 1
Finally, define a workflow that connects your tasks in a sequence. Remember, Spark and non-Spark tasks can be chained together as long as their parameter specifications match.
@workflow
def my_spark(triggered_date: datetime.datetime = datetime.datetime(2020, 9, 11)) -> float:
"""
Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
about how the image is configured.
"""
pi = hello_spark(partitions=1)
print_every_time(value_to_print=pi, date_triggered=triggered_date)
return pi
You can execute the workflow locally.
Certain aspects of Spark, such as links to {ref}Hive <Hive>
meta stores, may not work in the local execution,
but these limitations are inherent to using Spark and are not introduced by Flyte.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(
f"Running my_spark(triggered_date=datetime.datetime.now()) {my_spark(triggered_date=datetime.datetime.now())}"
)