Wei Mei
06/29/2022, 9:21 PM@task(nout=1)
def determine_run(clock):
if clock == "full_run":
<http://logger.info|logger.info>("full_run clock")
run_type = "full"
<http://logger.info|logger.info>(f"{run_type}")
return run_type
Jessica Smith
06/29/2022, 9:44 PMWei Mei
06/29/2022, 9:49 PMdbt_run = dbt(
_command_=f"dbt run -s {run_type.result}",
_task_args_={"name": f"DBT Run: {run_type.result}"},
_upstream_tasks_=[deps_output])
Jessica Smith
06/29/2022, 9:53 PMWei Mei
06/29/2022, 9:55 PM@task(name="Determine run type", checkpoint=True)
def determine_run(clock):
if clock == "full_run":
<http://logger.info|logger.info>("full_run clock")
run_type = "small_exchange"
<http://logger.info|logger.info>(f"{run_type}")
return run_type
@task(name="Determine run type", checkpoint=True)
def determine_run(clock):
if clock == "full_run":
<http://logger.info|logger.info>("full_run clock")
run_type = "full"
<http://logger.info|logger.info>(f"{run_type}")
return run_type
@task(name="Print run result")
def print_result(x):
<http://logger.info|logger.info>(x)
as flow:
clock = Parameter(name="clock", default="full_run")
run_type = determine_run(clock=clock)
print = print_result(run_type)
dbt_run = dbt(
command=f"dbt run -s {run_type.result}",
task_args={"name": f"DBT Run: {run_type.result}"},
upstream_tasks=[deps_output])
Jessica Smith
06/29/2022, 9:57 PMWei Mei
06/29/2022, 9:58 PMJessica Smith
06/29/2022, 9:58 PMWei Mei
06/29/2022, 9:58 PMJessica Smith
06/29/2022, 10:01 PMWei Mei
06/29/2022, 10:02 PM<Task: Determine run type>
Jessica Smith
06/29/2022, 10:03 PMWei Mei
06/29/2022, 10:05 PMJessica Smith
06/29/2022, 10:07 PM