Hopefully an easy one. I have a flow that needs to...
# prefect-community
e
Hopefully an easy one. I have a flow that needs to run 3 functions/tasks in order and I can't seem to find the correct syntax to make them run sequentially. Currently, I have simple code that looks like this:
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
@task
def fivetran_task():
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = PrefectSecret('five-tran-key')
api_secret = PrefectSecret('five-tran-secret')
poll_seconds = Parameter(name="poll_seconds", default=60)
return fivetran_sync_task(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
@task
def databricks_task():
notebook_task = {
"notebook_path": "MY NOTEBOOK",
}
json = {
'existing_cluster_id': 'MY CLUSTER',
'notebook_task': notebook_task
}
connection = PrefectSecret('databricks-connection-string')
notebook_run = DatabricksSubmitRun(json=json)
return notebook_run(databricks_conn_secret=connection)
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = PrefectSecret('dbt-token')
return dbt_job(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
with Flow("e2e-flow") as flow:
fivetran_status = fivetran_task().run()
databricks_status = databricks_task().run()
dbt_status = dbt_task().run()
if __name__ == "__main__":
flow.register(project_name="e2e-poc")
If I run this, I end up with
so a) the functions show as failed but b) later the tasks within the functions seem to run but out of order... what I would like is: • run fivetran_task • if successful run databricks_task • if successful run dbt_task Thanks in advance!
k
See this and don’t call
run
in the Flow, Prefect will do it for you
Copy code
with Flow("e2e-flow") as flow:
    fivetran_status = fivetran_task()
    databricks_status = databricks_task()
    dbt_status = dbt_task()
e
I've tried that as well. When I do so, I get this:
k
Ah do:
Copy code
@task
def dbt_task():
    dbt_job = DbtCloudRunJob()
    api_token = Secret('dbt-token').get()
    return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
e
Unfortunately, that still doesn't work. FWIW, I'm using 1.0 and I simply need to be able to execute a flow that hits these tools in this exact order, FiveTran -> Databricks -> dbt.
k
What error do you get?
You need to call
.run()
for all of the task definitions
Ah I read more thoroughly. There are many things off with the original script: 1. Don’t use PrefectSecret inside a task. It is also a task, instead use Secret.get() 2. You can’t use Parameter inside a task. It is also a task. Use it on the Flow level instead. 3. You need to call .run() is all of your returns to use a task inside another task
I think this should be a lot closer to working
Copy code
from asyncore import poll
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret

@task
def fivetran_task(poll_seconds):
    fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
    api_key = Secret('five-tran-key').get()
    api_secret = Secret('five-tran-secret').get()
    return fivetran_sync_task.run(
        api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
    )

@task
def databricks_task():
    notebook_task = {
    "notebook_path": "MY NOTEBOOK",
    }
    json = {
        'existing_cluster_id': 'MY CLUSTER',
        'notebook_task': notebook_task
    }
    connection = Secret('databricks-connection-string').get()
    notebook_run = DatabricksSubmitRun(json=json)
    return notebook_run.run(databricks_conn_secret=connection)
@task
def dbt_task():
    dbt_job = DbtCloudRunJob()
    api_token = Secret('dbt-token').get()
    return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
    
with Flow("e2e-flow") as flow:
    poll_seconds = Parameter(name="poll_seconds", default=60)
    fivetran_status = fivetran_task(poll_seconds)
    databricks_status = databricks_task()
    dbt_status = dbt_task()
    
if __name__ == "__main__":
    flow.register(project_name="e2e-poc")
e
Using Secret still didn't work for me (see errors) but the bullets helped me sort of understand what was going on. I just got something working but it feels really hacky... In order to get the task to execute in the order that I want, I had to pass the status of the previous task into the next task and call print(status). This feels kind of gross in that I have to pass in a status just to print it so that the tasks are assumed to need the result of the previous, which they do not other than to print it. So my question would be, is this not a core scenario? I would expect any orchestration tool to handle the need to run sequential dependency management within a give flow. In this case, I need to land data from FiveTran, then run some code in Databricks before then running ETL in dbt.
k
Ah I see. I think I posted the FAQ link above. It’s just:
Copy code
with Flow("e2e-flow") as flow:
    poll_seconds = Parameter(name="poll_seconds", default=60)
    fivetran_status = fivetran_task(poll_seconds)
    databricks_status = databricks_task(upstream_tasks=[fivetran_status])
    dbt_status = dbt_task(upstream_tasks=[databricks_status])
Or:
Copy code
with Flow("e2e-flow") as flow:
    poll_seconds = Parameter(name="poll_seconds", default=60)
    fivetran_status = fivetran_task(poll_seconds)
    databricks_status = databricks_task()
    dbt_status = dbt_task()

    databricks_status.set_upstream(fivetran_status)
    dbt_status.set_upstream(databricks_status)
e
Given that I can't run Parameter or PrefectSecret inside the task, I have to pass in args in addition so my version looks like this
with Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string, upstream_tasks=[fivetran_status])
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token, upstream_tasks=[databricks_status])
Which nets in the attached error.
Doing it the second way:
with Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string)
databricks_status.set_upstream(fivetran_status)
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token)
dbt_status.set_upstream(databricks_status)
And the attached error:
I'm sorry to be such a pain on this... Is there documentation that outlines how to build what I'm trying to do? In hunting through your documentation, I only see things around single executions or flows of flows.
k
I am a bit confused because:
Copy code
@task
def fivetran_task(poll_seconds):
    fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
    api_key = Secret('five-tran-key').get()
    api_secret = Secret('five-tran-secret').get()
    return fivetran_sync_task.run(
        api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
    )
just takes one argument but:
Copy code
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
tries to pass in 3? or did you change the function to accept more?
I think the Prefect part is already right. I think it’s just a Python error so no specific doc to share
e
facepalm wow... sorry I missed the obvious. I did indeed update to pass in 3 args but I failed to actually add one of them into the function sig... I'm running it now and it looks to be working. Thanks for the patience and help!
k
Of course! No worries!