Hi :wave: I started playing around with Prefect to...
# prefect-community
g
Hi 👋 I started playing around with Prefect today as an orchestration tool for our Databricks jobs. I've managed to trigger jobs successfully which is great but, I cannot figure out how to run tasks in sequence using the Functional API. I've been browsing through the docs but did not this example with Task Classes. Here's my flow:
Copy code
with Flow("application-etl") as flow:
    conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")

    bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")

    silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")
    silver.set_upstream(bronze)

    gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")
    gold.set_upstream(silver)

    bronze(databricks_conn_secret=conn)
    silver(databricks_conn_secret=conn)
    gold(databricks_conn_secret=conn)

flow.run()
I want to run my tasks in the sequence
bronze > silver > gold
. While this executes gold or silver are being started before bronze. I don't know if I'm misunderstanding how
set_upstream
works 🤔 Thanks in advance
k
Hey @Gustavo Puma, the
DatabricksRunNow()
is an init, but you want it on the run
Copy code
with Flow("application-etl") as flow:
    conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")

    bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")

    silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")

    gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")

    a = bronze(databricks_conn_secret=conn)
    b = silver(databricks_conn_secret=conn)
    c = gold(databricks_conn_secret=conn)
    b.set_upstream(a)
    c.set_upstream(b)

flow.run()
and the init doesn’t have to be in the flow
Copy code
bronze = DatabricksRunNow(job_id=BRONZE_APPLICATION_JOB_ID, name="bronze")

silver = DatabricksRunNow(job_id=SILVER_APPLICATION_JOB_ID, name="silver")

gold = DatabricksRunNow(job_id=GOLD_APPLICATION_JOB_ID, name="gold")

with Flow("application-etl") as flow:
    conn = PrefectSecret("DATABRICKS_CONNECTION_STRING_PRE")

    a = bronze(databricks_conn_secret=conn)
    b = silver(databricks_conn_secret=conn)
    c = gold(databricks_conn_secret=conn)
    b.set_upstream(a)
    c.set_upstream(b)

flow.run()
g
hi @Kevin that did the trick, thanks!
👍 1