https://prefect.io logo
Title
m

Matt Delacour

07/20/2022, 6:15 PM
šŸ‘‹ Is anyone scheduling DBT with Prefect here? How do you deal with dependencies (upstream & downstream)? Do you handle the logic of "This model needs to run before this other model"? On our side, we want to leverage the DBT built-in features as well as the visualization of Prefect with 1 DBT model == 1 Prefect flow
b

Billy McMonagle

07/20/2022, 6:19 PM
We are using Prefect to run pretty much all of our dbt models as part of a single flow that runs hourly. This lets dbt do what dbt does best, which is to handle "this model needs to run before this other model". I was hesitant about this approach because dbt was new to me, but I think it's the best way to leverage what dbt and prefect are each good at. If and when we run into performance issues where some dbt models shouldn't be run as part of this single DAG, we are planning to tag the dbt models with tags like
hourly
,
daily
, etc and run separate prefect flows for each tag (e.g.
dbt run --select tag:hourly
:thank-you: 1
Here's what our dbt flow looks like, if that helps.
# Hourly from 8am Eastern through the end of the day, plus 4am
weekdays = CronClock("0 4,8-23 * * 1-5", start_date=pendulum.now("America/New_York"))
# every six hours on weekends
weekends = CronClock("0 */6 * * 0,6", start_date=pendulum.now("America/New_York"))
schedule = Schedule(clocks=[weekdays, weekends])

with Flow(
    "dbt Flow", schedule=schedule, state_handlers=[slack_failure_notification]
) as flow:
    select = Parameter("select", default=None)
    exclude = Parameter("exclude", default=None)
    full_refresh = Parameter("full_refresh", default=False)
    manifest = download_manifest(task_args={"name": "Download Manifest"})
    dbt_dependencies = execute_dbt(
        task_args={"name": "dbt Dependencies"}, command="dbt deps"
    )
    dbt_build = execute_dbt(
        task_args={"name": "dbt Build"},
        command=f"dbt build --exclude dbt_artifacts",
        select=select,
        exclude=exclude,
        full_refresh=full_refresh,
        upstream_tasks=[dbt_dependencies, manifest],
    )
    upload_manifest(task_args={"name": "Upload Manifest"}, upstream_tasks=[dbt_build])
    dbt_artifacts = execute_dbt(
        task_args={"name": "dbt Artifacts", "trigger": all_finished},
        command="dbt --no-write-json run-operation upload_dbt_artifacts_v2",
        upstream_tasks=[dbt_build],
    )
    execute_dbt(
        task_args={"name": "dbt Artifacts Build"},
        command="dbt build",
        select="dbt_artifacts",
        schema="dbt_artifacts_grebe",
        upstream_tasks=[dbt_artifacts],
    )
flow.set_reference_tasks([dbt_build])
m

Matt Delacour

07/20/2022, 6:24 PM
Thanks for sharing šŸ™ Questions with this approach 1. How many models do you have in production? 2. Does it mean that your cluster is very busy at the beginning of every hour and then is idle? 3. What happens when only one model fails? Do you get an alert by Prefect about the Flow failing? 4. Do you think you have enough visibility per model about which ones are constantly failing and which ones are healthy?
cc @Eduardo Nunez
b

Billy McMonagle

07/20/2022, 6:31 PM
Good questions! 1. 200+ models, 250+ tests right now 2. Yes, the full DAG takes about 20 minutes to run. We make liberal use of incremental models which has brought down execution time on some pretty slow models. We're on Snowflake so the spike usage is preferred. 3. We get Slack alerts when the flow fails. We're actively working to make this more useful, because the dbt logs are very verbose. The solution is probably going to be a separate prefect flow that examines dbt run metadata and alerts the owners of particular models. 4. We use a package called dbt artifacts to materialize useful metadata about the model runs. This is not ideal out of the box behavior but we're working on it.
šŸ’Æ 1
e

Eduardo Nunez

07/20/2022, 6:55 PM
@Billy McMonagle awesome stuff. @Matt Delacour and I are concerned on cramming together model runs at the start of hour / start of day given we're on Redshift and don't have flexible on-demand cluster scaling during these peaks. Would love to know if anyone here that's on Redshift has solved this problem. Also curious what your approach is to viewing the status of the model runs. Do you plan on building dashboards on top of artifacts ? It would be awesome if the Prefect cloud console gave us stats / statuses on model runs but this would require a 1-to-1 mapping of flow to model, which isnt ideal
b

Billy McMonagle

07/20/2022, 7:02 PM
I understand your problem, perhaps that's something to be mitigated by setting the number of dbt worker processes. We migrated from Redshift prior to implementing our dbt project šŸ˜„ . I do plan to build dashboards on top of artifacts, the data that package provides is quite useful. I should add, for understanding what happened in the latest run, I review logs we've shipped to Datadog because their interface is better for filtering very verbose log streams.
m

Matt Delacour

07/20/2022, 9:15 PM
Did anyone try the following logic: ā€¢ Register each DBT model as a Prefect Flow ā€¢ When you run
dbt run
it will actually ask Prefect to run each Flow (eg flows-of-flows or Prefect CLI or GraphQL) And so it would benefit from DBT creating the DAG in optimized way + showing granular information about each DBT model in Prefect Flows Thoughts?
d

Darin Douglass

07/20/2022, 9:57 PM
Seems like a surefire way to have way too many flows in the UI. We stopped scheduling the main sync because of the variance in run-time. We now just spawn the next run when the current one finishes. We send a slack message per failed model and will eventually highlight the longest running models in the same way (we do it adhoc at the moment)
:thank-you: 1
m

Matt Delacour

07/20/2022, 10:01 PM
Seems like a surefire way to have way too many flows in the UI.
Is not it the goal of Prefect? šŸ˜„
We now just spawn the next run when the current one finishes.
Interesting. Could you elaborate on that? šŸ™
d

Darin Douglass

07/20/2022, 10:18 PM
In a way yes, but in this case I think having so many registered flows (Im thinking of our dbt setup of 350+ and constantly growing) would be a very large hassle to load/explore/keep track of in the UI. Plus it means that you need to hook prefect into dbts internal mechanisms which feels hacky. I could maybe see doing a single task per model but flows seems heavy handed. As for spawning, after everything has ran its course, we just have a
create_flow_run
task that starts up the next run of the current flow.
t

Taylor Curran

10/08/2022, 3:53 PM
Hey just now stumbling upon this great dbt thread :dbt: Are any of you planning on attending DBT Coalesce in New Orleans 2 weeks from now?