Hi! I’m currently orchestrating dbt via Prefect. T...
# prefect-community
k
Hi! I’m currently orchestrating dbt via Prefect. The usual dbt flow uses pygit2 to pull from our main branch, then run the dbt models and create tables into our production schema in Snowflake. I want to create a tool where we can run 2 dbt instances in parallel, one that pulls from the main branch, one that pulls from a dev branch (chosen by a user via parameters). Each of those would create tables in a separate schema in Snowflake. The purpose is to create 2 versions of certain tables to compare and see the effect of the dev branch. Now, my question is what would be the best way to design the flow(s) to this? I’m not sure if I should create 1 flow and some how pull 2 separate branches and have dbt run those 2 in parallel. Or create 1 parent flow that passes the user inputed parameter into the children flow, then each child flow runs a dbt instance? Or something completely different? Sorry for the lengthy question!
I’m putting a lot of emphasis on running 2 dbt instances in parallel because I want to make sure all differences between the 2 sets of tables created are due to main vs dev code, rather than due to run time differences
a
I think this blog post shows something very similar to what you are trying to build. It accepts the repository name and branch name as parameters. You could additionally add additional parameters based on your dbt environment.
btw this will be way easier to do in Prefect 2.0! so we definitely understand the problem, it's not easy to switch between environments 🙂
btw a bit similar question here - in case it may help too
k
Thanks Anna! So it sounds like you’re suggesting for me to have 1 parent flow where user can input parameters, then those parameters could then be passed into 2 child flows to be run in parallel?
a
Not necessarily a parent flow. My suggestion was to have the repo and branch name as parameters. How you create a flow run from this flow (whether from a parent flow or from a UI) is up to you and your use case
e.g. you may have 2 clocks attached to your schedule (even overlapping clocks) and they may have different parameter defaults attached to them:
Copy code
clock1 = clocks.IntervalClock(start_date=now, 
                                interval=datetime.timedelta(minutes=15), 
                                parameter_defaults={"branch": "main"})
clock2 = clocks.IntervalClock(start_date=now, 
                                interval=datetime.timedelta(minutes=15), 
                                parameter_defaults={"branch": "dev"})

with Flow("dbt", schedule = Schedule(clocks=[clock1, clock2])) as flow:
k
Oh that’s interesting, could you give me some more information on how the branch parameters from the clock would be passed into 2 separate runs? I don’t quite understand that connection yet
a
This docs page explains it. You can attach multiple clocks to a schedule and attach different parameter defaults to each. Each of those schedules will generate flow runs with different parameter values.
k
This is great, thank you so much!
🙌 1
Hi Anna, I’m looking closer into this and I have a couple questions. So as I’ve mentioned before, I want 2 instances of dbt running in parallel, each running with a different schema. I would want the user to be able to input schema_A and schema_B parameters, which would then be passed into the flow to run the 2 dbt instances. In the example you told me above, you’re able to have 2 schemas for the 2 dbt instances because you have 2 overlapping clocks, each with a different parameter_default. However, I can’t figure out how I would be able to allow users to enter schema_A and schema B parameters, and then have dbt instances run in parallel with those different parameters. Am I missing something here?
a
You can use a dictionary parameter value with the structure:
Copy code
schemas = Parameter("schemas", default=dict(dev_schema="x", prod_schema="y"))
This way, you can use this single parameter to trigger 2 parallel tasks running
dbt run
, passing the schemas from this parameter value to downstream tasks What may help you is to forget about Prefect and think how you would solve it in just Python - you would likely have some parametrized script and a for-loop, right? using this approach, you can map this to Prefect concepts: parameters and mapping
k
Thank you for the tip Anna. Funny thing is I learnt Python around the same time I started using Prefect, so sometimes my question is to address my lack of knowledge in both, yours and Kevin’s guidance have helped me immensely so far! I currently have something that looks like this:
Copy code
clock1 = clocks.Clock(start_date=now,  
                        parameter_defaults={"schema_name": "stage"})
clock2 = clocks.Clock(start_date=now, 
                        parameter_defaults={"schema_name": "dev"})

with Flow("parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, schedule = Schedule(clocks=[clock1, clock2])) as flow:
    schema_name = Parameter('schema_name', default=None, required=False)

    dbt = run_dbt(
        schema_name,
        command="dbt run",
    )
with your suggestion above, I still don’t quite understand how having a dictionary as the default would then be passed onto my
run_dbt
task, should I also be adjusting my clocks?
a
I see, interesting 🙂 perhaps you can use a list of schemas in the parameter task and combine it with mapping?
Copy code
schemas = Parameter("schema_names", default=["stage", "dev"])
dbt = run_dbt.map(
        schemas,
        command=unmapped("dbt run"),
    )
k
Might be a silly question, but how would each clock know which element in the list (“stage” or “dev” in this case) to pick when it runs?
a
you wouldn't need a separate clock then because there is only one flow run for both, i.e., running two task runs in parallel rather than two flow runs in parallel (via overlapping clocks) 🙂 does it make sense?
k
Oh! so having a list for default in Parameters +
map
would run the task once for each element in the default list, in parallel?
Copy code
schemas = Parameter("schema_names", default=["stage", "dev", "dev2"])
dbt = run_dbt.map(
        schemas,
        command=unmapped("dbt run"),
    )
So the above code would run 3 dbt instances in parallel, each one using a different schema name?
a
Exactly!
k
Amazing, thank you! I will test this out. Curious to see how chaotic the logs would look 😳
Oh that brings up a good point, if I have a downstream task from the run_dbt, and only 1 of 3 parallel run fails, I’m assuming that would mean the downstream task would not be triggered for any of the parallel runs?
a
Good point regarding the dbt logs. I would assume they shouldn't be chaotic at all if you have a downstream task retrieving the logs or sending them somewhere directly from your dbt task. The trigger behavior is configurable, by default your downstream mapped tasks would be used as separate parallel pipelines - this docs page explains it more
k
Prefect is so cool. If I wanted to add a task before my run_dbt task to pull from a different repo, would mapping still be applicable there? Would each parallel run be a different environment?
😎 1
a
Yes, you could totally do that and pass non-mapped arguments using the
unmapped
keyword
k
So something like this:
Copy code
branches = Parameter("branch_names", default=["main", "dev_branch"])
schemas = Parameter("schema_names", default=["stage", "dev"])

pull_repo = pull.map(
        branches,
        repo = unmapped(repo_name)
    )
dbt = run_dbt.map(
        schemas,
        command=unmapped("dbt run"),
    )
Each parallel flow would be based on the order of the default list in each parameter (e.g. parallel flow 1 would pull
main
branch & run with
stage
schema, parallel flow 2 would pull
dev_branch
& run with
dev
schema)?
1
a
Correct! Nice progress 🙌
❤️ 1
k
Hi again Anna, I created a flow similar to sample code I sent above and believe I’ve successfully done the mapping. I’m pulling from 2 different branches of the same repo to clone the ‘dbt’ folder, and my issue now is I’m getting the following error:
ValueError: 'dbt' exists and is not an empty directory
The Clone Dbt[0] task was successful, but the Clone Dbt[1] task is the one giving that error. Does this mean each parallel task is actually not running on a separate environment? (I may be using the term environment incorrectly here)
Also a side question, I noticed that I don’t see the logs of the dbt run at all despite the following arguments being set to true:
Copy code
log_stdout=True,
        log_stderr=True,
        stream_output=True,
Is there something I must set for mapped dbt runs to output logs? Or will I need to create a downstream task to display that?
@Kevin Kho Some context on what I’m trying to achieve above!
k
Ah gotcha will respond in other thread since Anna will be out
1