Ken Nguyen
04/09/2022, 4:56 PMAnna Geller
04/09/2022, 5:00 PMKen Nguyen
04/09/2022, 5:10 PMAnna Geller
04/09/2022, 5:16 PMclock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=15),
parameter_defaults={"branch": "main"})
clock2 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=15),
parameter_defaults={"branch": "dev"})
with Flow("dbt", schedule = Schedule(clocks=[clock1, clock2])) as flow:
Ken Nguyen
04/09/2022, 5:24 PMAnna Geller
04/09/2022, 5:38 PMKen Nguyen
04/09/2022, 5:54 PMAnna Geller
04/14/2022, 10:12 AMschemas = Parameter("schemas", default=dict(dev_schema="x", prod_schema="y"))
This way, you can use this single parameter to trigger 2 parallel tasks running dbt run
, passing the schemas from this parameter value to downstream tasks
What may help you is to forget about Prefect and think how you would solve it in just Python - you would likely have some parametrized script and a for-loop, right? using this approach, you can map this to Prefect concepts: parameters and mappingKen Nguyen
04/14/2022, 5:18 PMclock1 = clocks.Clock(start_date=now,
parameter_defaults={"schema_name": "stage"})
clock2 = clocks.Clock(start_date=now,
parameter_defaults={"schema_name": "dev"})
with Flow("parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, schedule = Schedule(clocks=[clock1, clock2])) as flow:
schema_name = Parameter('schema_name', default=None, required=False)
dbt = run_dbt(
schema_name,
command="dbt run",
)
with your suggestion above, I still don’t quite understand how having a dictionary as the default would then be passed onto my run_dbt
task, should I also be adjusting my clocks?Anna Geller
04/14/2022, 5:28 PMschemas = Parameter("schema_names", default=["stage", "dev"])
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)
Ken Nguyen
04/14/2022, 5:30 PMAnna Geller
04/14/2022, 5:33 PMKen Nguyen
04/14/2022, 5:56 PMmap
would run the task once for each element in the default list, in parallel?schemas = Parameter("schema_names", default=["stage", "dev", "dev2"])
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)
So the above code would run 3 dbt instances in parallel, each one using a different schema name?Anna Geller
04/14/2022, 5:57 PMKen Nguyen
04/14/2022, 5:59 PMAnna Geller
04/14/2022, 6:06 PMKen Nguyen
04/14/2022, 6:23 PMAnna Geller
04/14/2022, 6:27 PMunmapped
keywordKen Nguyen
04/14/2022, 6:43 PMbranches = Parameter("branch_names", default=["main", "dev_branch"])
schemas = Parameter("schema_names", default=["stage", "dev"])
pull_repo = pull.map(
branches,
repo = unmapped(repo_name)
)
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)
Each parallel flow would be based on the order of the default list in each parameter (e.g. parallel flow 1 would pull main
branch & run with stage
schema, parallel flow 2 would pull dev_branch
& run with dev
schema)?Anna Geller
04/14/2022, 6:45 PMKen Nguyen
04/14/2022, 8:59 PMValueError: 'dbt' exists and is not an empty directory
The Clone Dbt[0] task was successful, but the Clone Dbt[1] task is the one giving that error. Does this mean each parallel task is actually not running on a separate environment? (I may be using the term environment incorrectly here)log_stdout=True,
log_stderr=True,
stream_output=True,
Is there something I must set for mapped dbt runs to output logs? Or will I need to create a downstream task to display that?Kevin Kho
04/14/2022, 11:33 PM