V
10/06/2022, 8:01 PMdef task_orchestrator(tbl):
table = tbl["tables"]
depends_on = tbl["depends_on"]
table_task = build_table.map(tbl=table)
if depends_on:
task_dependencies = build_table.map(tbl=depends_on)
table_task = table_task.set_dependencies(upstream_tasks=[task_dependencies])
return table_task
@task(log_stdout=True, max_retries=2,retry_delay=datetime.timedelta(seconds=10))
def build_table(tbl):
if tbl is not None:
print(f"{tbl} is triggered!!")
time.sleep(5)
print(f"{tbl} is done!!")
return tbl
with Flow("process_tables",executor=DaskExecutor()) as flow:
dependencies = [
{"tables": ["dim4"], "depends_on": ["dim1"]},
{"tables": ["dim1"], "depends_on": []},
{"tables": ["fact2"], "depends_on": ["dim2"]},
{"tables": ["fact1"], "depends_on": ["dim2", "dim1"]},
{"tables": ["dim2"], "depends_on": []},
{"tables": ["dim3"], "depends_on": []},
]
result = map(task_orchestrator, dependencies)
print(list(result))
if __name__ == "__main__":
flow.run()
Anna Geller
10/07/2022, 3:24 PMV
10/07/2022, 3:27 PMAnna Geller
10/07/2022, 3:30 PM