Emerson Franks
07/18/2022, 11:39 PMEmerson Franks
07/18/2022, 11:39 PMfrom prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
@task
def fivetran_task():
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = PrefectSecret('five-tran-key')
api_secret = PrefectSecret('five-tran-secret')
poll_seconds = Parameter(name="poll_seconds", default=60)
return fivetran_sync_task(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
@task
def databricks_task():
notebook_task = {
"notebook_path": "MY NOTEBOOK",
}
json = {
'existing_cluster_id': 'MY CLUSTER',
'notebook_task': notebook_task
}
connection = PrefectSecret('databricks-connection-string')
notebook_run = DatabricksSubmitRun(json=json)
return notebook_run(databricks_conn_secret=connection)
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = PrefectSecret('dbt-token')
return dbt_job(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
with Flow("e2e-flow") as flow:
fivetran_status = fivetran_task().run()
databricks_status = databricks_task().run()
dbt_status = dbt_task().run()
if __name__ == "__main__":
flow.register(project_name="e2e-poc")
Emerson Franks
07/18/2022, 11:40 PMEmerson Franks
07/18/2022, 11:42 PMKevin Kho
run
in the Flow, Prefect will do it for youKevin Kho
with Flow("e2e-flow") as flow:
fivetran_status = fivetran_task()
databricks_status = databricks_task()
dbt_status = dbt_task()
Emerson Franks
07/19/2022, 3:43 AMKevin Kho
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = Secret('dbt-token').get()
return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
Emerson Franks
07/19/2022, 4:24 PMKevin Kho
Kevin Kho
.run()
for all of the task definitionsKevin Kho
Kevin Kho
from asyncore import poll
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtCloudRunJob
from prefect.tasks.databricks import DatabricksSubmitRun
from prefect.tasks.fivetran import FivetranSyncTask
from prefect.tasks.secrets import PrefectSecret
from prefect.client import Secret
@task
def fivetran_task(poll_seconds):
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = Secret('five-tran-key').get()
api_secret = Secret('five-tran-secret').get()
return fivetran_sync_task.run(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
@task
def databricks_task():
notebook_task = {
"notebook_path": "MY NOTEBOOK",
}
json = {
'existing_cluster_id': 'MY CLUSTER',
'notebook_task': notebook_task
}
connection = Secret('databricks-connection-string').get()
notebook_run = DatabricksSubmitRun(json=json)
return notebook_run.run(databricks_conn_secret=connection)
@task
def dbt_task():
dbt_job = DbtCloudRunJob()
api_token = Secret('dbt-token').get()
return dbt_job.run(cause='Prefect test.', token=api_token, account_id=1234, job_id=1234, wait_for_job_run_completion=True, max_wait_time=600)
with Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task()
dbt_status = dbt_task()
if __name__ == "__main__":
flow.register(project_name="e2e-poc")
Emerson Franks
07/19/2022, 5:20 PMKevin Kho
with Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task(upstream_tasks=[fivetran_status])
dbt_status = dbt_task(upstream_tasks=[databricks_status])
Kevin Kho
with Flow("e2e-flow") as flow:
poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(poll_seconds)
databricks_status = databricks_task()
dbt_status = dbt_task()
databricks_status.set_upstream(fivetran_status)
dbt_status.set_upstream(databricks_status)
Emerson Franks
07/19/2022, 5:37 PMwith Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string, upstream_tasks=[fivetran_status])
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token, upstream_tasks=[databricks_status])
Which nets in the attached error.Emerson Franks
07/19/2022, 5:38 PMwith Flow("e2e-flow") as flow:
fivetran_api_key = PrefectSecret('five-tran-key')
fivetran_api_secret = PrefectSecret('five-tran-secret')
fivetran_poll_seconds = Parameter(name="poll_seconds", default=60)
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
databricks_connection_string=PrefectSecret('databricks-connection-string')
databricks_status = databricks_task(databricks_connection_string)
databricks_status.set_upstream(fivetran_status)
dbt_api_token = PrefectSecret('dbt-token')
dbt_status = dbt_task(dbt_api_token)
dbt_status.set_upstream(databricks_status)
And the attached error:Emerson Franks
07/19/2022, 5:40 PMKevin Kho
@task
def fivetran_task(poll_seconds):
fivetran_sync_task = FivetranSyncTask(connector_id="A_CONNECTOR")
api_key = Secret('five-tran-key').get()
api_secret = Secret('five-tran-secret').get()
return fivetran_sync_task.run(
api_key=api_key, api_secret=api_secret, poll_status_every_n_seconds=poll_seconds
)
just takes one argument but:
fivetran_status = fivetran_task(fivetran_api_key, fivetran_api_secret, fivetran_poll_seconds)
tries to pass in 3? or did you change the function to accept more?Kevin Kho
Emerson Franks
07/19/2022, 5:43 PMKevin Kho