Ken Nguyen
04/09/2022, 4:56 PMKen Nguyen
04/09/2022, 4:58 PMAnna Geller
Anna Geller
Anna Geller
Ken Nguyen
04/09/2022, 5:10 PMAnna Geller
Anna Geller
clock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=15),
parameter_defaults={"branch": "main"})
clock2 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=15),
parameter_defaults={"branch": "dev"})
with Flow("dbt", schedule = Schedule(clocks=[clock1, clock2])) as flow:Ken Nguyen
04/09/2022, 5:24 PMAnna Geller
Ken Nguyen
04/09/2022, 5:54 PMKen Nguyen
04/14/2022, 8:01 AMAnna Geller
schemas = Parameter("schemas", default=dict(dev_schema="x", prod_schema="y"))
This way, you can use this single parameter to trigger 2 parallel tasks running dbt run, passing the schemas from this parameter value to downstream tasks
What may help you is to forget about Prefect and think how you would solve it in just Python - you would likely have some parametrized script and a for-loop, right? using this approach, you can map this to Prefect concepts: parameters and mappingKen Nguyen
04/14/2022, 5:18 PMclock1 = clocks.Clock(start_date=now,
parameter_defaults={"schema_name": "stage"})
clock2 = clocks.Clock(start_date=now,
parameter_defaults={"schema_name": "dev"})
with Flow("parallel-dbt-run-flow", run_config=RUN_CONFIG, storage=STORAGE, schedule = Schedule(clocks=[clock1, clock2])) as flow:
schema_name = Parameter('schema_name', default=None, required=False)
dbt = run_dbt(
schema_name,
command="dbt run",
)
with your suggestion above, I still don’t quite understand how having a dictionary as the default would then be passed onto my run_dbt task, should I also be adjusting my clocks?Anna Geller
schemas = Parameter("schema_names", default=["stage", "dev"])
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)Ken Nguyen
04/14/2022, 5:30 PMAnna Geller
Ken Nguyen
04/14/2022, 5:56 PMmap would run the task once for each element in the default list, in parallel?Ken Nguyen
04/14/2022, 5:56 PMschemas = Parameter("schema_names", default=["stage", "dev", "dev2"])
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)
So the above code would run 3 dbt instances in parallel, each one using a different schema name?Anna Geller
Ken Nguyen
04/14/2022, 5:59 PMKen Nguyen
04/14/2022, 6:00 PMAnna Geller
Ken Nguyen
04/14/2022, 6:23 PMAnna Geller
unmapped keywordKen Nguyen
04/14/2022, 6:43 PMbranches = Parameter("branch_names", default=["main", "dev_branch"])
schemas = Parameter("schema_names", default=["stage", "dev"])
pull_repo = pull.map(
branches,
repo = unmapped(repo_name)
)
dbt = run_dbt.map(
schemas,
command=unmapped("dbt run"),
)
Each parallel flow would be based on the order of the default list in each parameter (e.g. parallel flow 1 would pull main branch & run with stage schema, parallel flow 2 would pull dev_branch & run with dev schema)?Anna Geller
Ken Nguyen
04/14/2022, 8:59 PMValueError: 'dbt' exists and is not an empty directory
The Clone Dbt[0] task was successful, but the Clone Dbt[1] task is the one giving that error. Does this mean each parallel task is actually not running on a separate environment? (I may be using the term environment incorrectly here)Ken Nguyen
04/14/2022, 9:20 PMlog_stdout=True,
log_stderr=True,
stream_output=True,
Is there something I must set for mapped dbt runs to output logs? Or will I need to create a downstream task to display that?Ken Nguyen
04/14/2022, 11:30 PMKevin Kho