Emerson Franks
07/18/2022, 11:39 PMfrom prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
@task
def fivetran_task():
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = PrefectSecret('five-tran-key')
api_secret = PrefectSecret('five-tran-secret')
poll_seconds = Parameter(name="poll_seconds", default=60)
return fivetran_sync_task(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
@task
def databricks_task():
notebook_task = {
"notebook_path": "MY NOTEBOOK",
}
json = {
'existing_cluster_id': 'MY CLUSTER',
'notebook_task': notebook_task
}
connection = PrefectSecret('databricks-connection-string')
notebook_run = DatabricksSubmitRun(json=json)
return notebook_run(databricks_conn_secret=connection)
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = PrefectSecret('dbt-token')
return dbt_job(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
with Flow("e2e-flow") as flow:
fivetran_status = fivetran_task().run()
databricks_status = databricks_task().run()
dbt_status = dbt_task().run()
if __name__ == "__main__":
flow.register(project_name="e2e-poc")
Kevin Kho
07/19/2022, 12:19 AMrun
in the Flow, Prefect will do it for youwith Flow("e2e-flow") as flow:
fivetran_status = fivetran_task()
databricks_status = databricks_task()
dbt_status = dbt_task()
Emerson Franks
07/19/2022, 3:43 AMKevin Kho
07/19/2022, 3:55 AM@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = Secret('dbt-token').get()
return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
Emerson Franks
07/19/2022, 4:24 PMKevin Kho
07/19/2022, 4:52 PM.run()
for all of the task definitionsfrom asyncore import poll
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret
@task
def fivetran_task(poll_seconds):
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = Secret('five-tran-key').get()
api_secret = Secret('five-tran-secret').get()
return fivetran_sync_task.run(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
@task
def databricks_task():
notebook_task = {
"notebook_path": "MY NOTEBOOK",
}
json = {
'existing_cluster_id': 'MY CLUSTER',
'notebook_task': notebook_task
}
connection = Secret('databricks-connection-string').get()
notebook_run = DatabricksSubmitRun(json=json)
return notebook_run.run(databricks_conn_secret=connection)
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = Secret('dbt-token').get()
return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
with Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task()
dbt_status = dbt_task()
if __name__ == "__main__":
flow.register(project_name="e2e-poc")
Emerson Franks
07/19/2022, 5:20 PMKevin Kho
07/19/2022, 5:23 PMwith Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task(upstream_tasks=[fivetran_status])
dbt_status = dbt_task(upstream_tasks=[databricks_status])
with Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task()
dbt_status = dbt_task()
databricks_status.set_upstream(fivetran_status)
dbt_status.set_upstream(databricks_status)
Emerson Franks
07/19/2022, 5:37 PMwith Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string, upstream_tasks=[fivetran_status])
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token, upstream_tasks=[databricks_status])
Which nets in the attached error.with Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string)
databricks_status.set_upstream(fivetran_status)
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token)
dbt_status.set_upstream(databricks_status)
And the attached error:Kevin Kho
07/19/2022, 5:40 PM@task
def fivetran_task(poll_seconds):
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = Secret('five-tran-key').get()
api_secret = Secret('five-tran-secret').get()
return fivetran_sync_task.run(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
just takes one argument but:
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
tries to pass in 3? or did you change the function to accept more?Emerson Franks
07/19/2022, 5:43 PMKevin Kho
07/19/2022, 5:44 PM