Matt Delacour
07/20/2022, 6:15 PMBilly McMonagle
07/20/2022, 6:19 PMhourly
, daily
, etc and run separate prefect flows for each tag (e.g. dbt run --select tag:hourly
# Hourly from 8am Eastern through the end of the day, plus 4am
weekdays = CronClock("0 4,8-23 * * 1-5", start_date=pendulum.now("America/New_York"))
# every six hours on weekends
weekends = CronClock("0 */6 * * 0,6", start_date=pendulum.now("America/New_York"))
schedule = Schedule(clocks=[weekdays, weekends])
with Flow(
"dbt Flow", schedule=schedule, state_handlers=[slack_failure_notification]
) as flow:
select = Parameter("select", default=None)
exclude = Parameter("exclude", default=None)
full_refresh = Parameter("full_refresh", default=False)
manifest = download_manifest(task_args={"name": "Download Manifest"})
dbt_dependencies = execute_dbt(
task_args={"name": "dbt Dependencies"}, command="dbt deps"
)
dbt_build = execute_dbt(
task_args={"name": "dbt Build"},
command=f"dbt build --exclude dbt_artifacts",
select=select,
exclude=exclude,
full_refresh=full_refresh,
upstream_tasks=[dbt_dependencies, manifest],
)
upload_manifest(task_args={"name": "Upload Manifest"}, upstream_tasks=[dbt_build])
dbt_artifacts = execute_dbt(
task_args={"name": "dbt Artifacts", "trigger": all_finished},
command="dbt --no-write-json run-operation upload_dbt_artifacts_v2",
upstream_tasks=[dbt_build],
)
execute_dbt(
task_args={"name": "dbt Artifacts Build"},
command="dbt build",
select="dbt_artifacts",
schema="dbt_artifacts_grebe",
upstream_tasks=[dbt_artifacts],
)
flow.set_reference_tasks([dbt_build])
Matt Delacour
07/20/2022, 6:24 PMBilly McMonagle
07/20/2022, 6:31 PMEduardo Nunez
07/20/2022, 6:55 PMBilly McMonagle
07/20/2022, 7:02 PMMatt Delacour
07/20/2022, 9:15 PMdbt run
it will actually ask Prefect to run each Flow (eg flows-of-flows or Prefect CLI or GraphQL)
And so it would benefit from DBT creating the DAG in optimized way + showing granular information about each DBT model in Prefect Flows
Thoughts?Darin Douglass
07/20/2022, 9:57 PMMatt Delacour
07/20/2022, 10:01 PMSeems like a surefire way to have way too many flows in the UI.Is not it the goal of Prefect? š
We now just spawn the next run when the current one finishes.Interesting. Could you elaborate on that? š
Darin Douglass
07/20/2022, 10:18 PMcreate_flow_run
task that starts up the next run of the current flow.Taylor Curran
10/08/2022, 3:53 PM