David Elliott
10/10/2022, 4:36 PMZanie
from prefect import flow, Flow, task
@task
def foo(x):
return 1
def build_flow() -> Flow:
tasks = [foo.with_options(name=f"task_{i}") for i in range(10)]
@flow
def my_flow():
for task in tasks():
task("test")
return my_flow
my_flow = build_flow()
David Elliott
10/10/2022, 4:56 PMupstreams
list of tasks as a task argument in order to form the dependencies actually..? Any chance you’d be able to extend it slightly please? Sorry if I’m missing the obvious!add_task()
and then afterwards set the edges with set_dependency()
, but with the functionised / decorator approach, I’m not sure how I can dynamically pass in an object which hasn’t necessarily been defined yet..?
ie not sure how to write this dynamically:
task1 = foo(name="t1", upstreams=[])
task2 = foo(name="t2", upstreams=[task1])
Zanie
my_network_graph = {
"task_1" : [],
"task_2" : [],
"task_3" : ["task_1", "task_2"],
"task_4" : ["task_3"],
"task_5" : ["task_4"],
}
from prefect import flow, Flow, task
@task
def foo(x):
return 1
def build_flow(network) -> Flow:
tasks = {}
for name in my_network_graph.keys():
tasks[name] = foo.with_options(name=name)
@flow
def my_flow():
for task, upstreams in tasks.items():
task.submit(wait_for=[tasks[name] for name in upstreams])
return my_flow
my_flow = build_flow(my_network_graph)
David Elliott
10/10/2022, 6:11 PMwait_for=[]
rather than the task futures which are returned by task.submit()
so when it’s ran it basically just runs 5x unrelated tasks.
I actually don’t need to pass data between them but I do need to set the dependencies and so need to pass the futures into downstream tasks.
However, given the above issue (not passing futures) I actually don’t think it’s possible to dynamically generate flows anymore in 2.0. The only thing I can think of is to dynamically write the flow definition to a python file (like with open(flow.py,w) as f
and f.write()
and then call / import that python file…?my_network_graph = {
"task_1" : [],
"task_2" : [],
"task_3" : ["task_1", "task_2"],
"task_4" : ["task_3"],
"task_5" : ["task_4"],
}
from prefect import flow, Flow, task
@task
def foo(task_name):
print(task_name)
def build_flow(network) -> Flow:
tasks = {}
for name in network.keys():
tasks[name] = foo.with_options(name=name)
@flow(name="demo_dynamic")
def my_flow():
for task_name, task in tasks.items():
upstream_task_names = network[task_name]
upstream_tasks = [tasks[u] for u in upstream_task_names]
task.submit(task_name=task_name, wait_for=upstream_tasks)
return my_flow
my_flow = build_flow(my_network_graph)
if __name__ == "__main__":
my_flow()
Zanie
@flow
def my_flow():
futures = {}
for task, upstreams in tasks.items():
futures[name] = task.submit(wait_for=[futures[name] for name in upstreams])
David Elliott
10/10/2022, 6:25 PMZanie
David Elliott
10/10/2022, 9:08 PMZanie
David Elliott
10/12/2022, 10:36 AMZanie