Hello everyone, I’m trying to process tables in an...
# prefect-community
Hello everyone, I’m trying to process tables in an execution order defined in the dependency list, made some progress, however I’m not sure how I can refer to an existing task without creating one each time, using Prefect V1, sample code in the 🧵
Copy code
def task_orchestrator(tbl):
	table = tbl["tables"]
	depends_on = tbl["depends_on"]
	table_task = build_table.map(tbl=table)
	if depends_on:
		task_dependencies = build_table.map(tbl=depends_on)
		table_task = table_task.set_dependencies(upstream_tasks=[task_dependencies])
	return table_task

@task(log_stdout=True, max_retries=2,retry_delay=datetime.timedelta(seconds=10))
def build_table(tbl):
	if tbl is not None:
		print(f"{tbl} is triggered!!")
		print(f"{tbl} is done!!")
		return tbl

with Flow("process_tables",executor=DaskExecutor()) as flow:
	dependencies = [
					{"tables": ["dim4"], "depends_on": ["dim1"]},
	                {"tables": ["dim1"], "depends_on": []},
					{"tables": ["fact2"], "depends_on": ["dim2"]},
					{"tables": ["fact1"], "depends_on": ["dim2", "dim1"]},
	                {"tables": ["dim2"], "depends_on": []},
	                {"tables": ["dim3"], "depends_on": []},
	result = map(task_orchestrator, dependencies)
if __name__ == "__main__":
@Anna Geller can you get someone from prefect to hopefully answer this (sorry about direct mention)
this looks familiar, are you from VV? if so I believe @Rob Freedy is already involved in working on a solution for this with you
yes, hoping to get more ideas on this so I could implement before the meeting and validate with Rob
it's best to focus on the problem you are trying to solve -- if the problem is to avoid boilerplate in building data warehousing workflows, then this problem can likely be solved more easily via custom abstraction rather than config-based DAG definitions, please chat with Rob since he may give you more actionable advice on this
👍 1