Feliks Krawczyk
08/29/2019, 1:57 AMfrom prefect import task, Flow
@task
def create_cluster():
cluster = create_spark_cluster()
return cluster
@task
def run_spark_job(cluster):
submit_job(cluster)
@task
def tear_down_cluster(cluster):
tear_down(cluster)
with Flow("Spark") as flow:
# define data dependencies
cluster = create_cluster()
submitted = run_spark_job(cluster)
tear_down_cluster(cluster)
# wait for the job to finish before tearing down the cluster
tear_down_cluster.set_upstream(submitted)
Wouldn’t you also need to add submitted.set_upstream(cluster)
to ensure this works? Or is there some hidden logic that means when you define a flow that it runs the lines in order, so submitted
runs only after cluster
? If that is the case, how do you get tasks to run in parallel?Chris White
submitted = run_spark_job(cluster)
set cluster
as an upstream dependency of submitted
automaticallyChris White
Chris White
flow.visualize()
on the flow above to see what it looks like and how the dependencies have been setFeliks Krawczyk
08/29/2019, 2:02 AMcluster
)Feliks Krawczyk
08/29/2019, 2:02 AMChris White
Feliks Krawczyk
08/29/2019, 2:02 AMChris White