https://prefect.io logo
#prefect-community
Title
# prefect-community
d

David Elliott

10/10/2022, 4:36 PM
Hey all, wondering if anyone has any idea on how to generate/write a flow dynamically in 2.0? e.g add tasks + dependencies from another graph object… In 1.0 it used to be possible to just instantiate a flow object and then add tasks / dependencies to the flow object (e.g whilst iterating over a dict or a networkX graph), however now flows are decorators on functions I’m struggling to see how I’d do that? I’ll attach some example code in the 🧵 that shows what I’m trying to achieve. Thanks for any suggestions in advance!
1
eg code.py
z

Zanie

10/10/2022, 4:42 PM
Perhaps something like this?
Copy code
from prefect import flow, Flow, task

@task
def foo(x):
    return 1


def build_flow() -> Flow:

    tasks = [foo.with_options(name=f"task_{i}") for i in range(10)]

    @flow
    def my_flow():
        for task in tasks():
            task("test")

    return my_flow


my_flow = build_flow()
👀 1
I didn’t implement the network, but do you see how you could do so?
d

David Elliott

10/10/2022, 4:56 PM
Thanks! I’m struggling to see how to pass in the
upstreams
list of tasks as a task argument in order to form the dependencies actually..? Any chance you’d be able to extend it slightly please? Sorry if I’m missing the obvious!
I think the difficulty is I need to be able to pass in the result of one task (a PrefectFuture) as an argument to the next task - which I’m not sure is possible at python runtime..? ie previously I could add the task to the flow with
add_task()
and then afterwards set the edges with
set_dependency()
, but with the functionised / decorator approach, I’m not sure how I can dynamically pass in an object which hasn’t necessarily been defined yet..? ie not sure how to write this dynamically:
Copy code
task1 = foo(name="t1", upstreams=[])
task2 = foo(name="t2", upstreams=[task1])
z

Zanie

10/10/2022, 5:47 PM
Untested, but like…
Copy code
my_network_graph = {
  "task_1" : [],
  "task_2" : [],
  "task_3" : ["task_1", "task_2"],
  "task_4" : ["task_3"],
  "task_5" : ["task_4"],
}

from prefect import flow, Flow, task

@task
def foo(x):
    return 1


def build_flow(network) -> Flow:
    tasks = {}
    for name in my_network_graph.keys():
        tasks[name] = foo.with_options(name=name)

    @flow
    def my_flow():
        for task, upstreams in tasks.items():
            task.submit(wait_for=[tasks[name] for name in upstreams])


    return my_flow


my_flow = build_flow(my_network_graph)
👀 1
If you’re trying to actually pass the data you’re basically going to have to implement some sort of DAG walker for your network which sounds painful.
Like.. • Run all tasks without upstreams, store their futures • Find all tasks with upstreams fulfilled, run them and store their futures • Repeat…
d

David Elliott

10/10/2022, 6:11 PM
I’m afraid the above code doesn’t work sadly as it’s passing the un-called tasks in the
wait_for=[]
rather than the task futures which are returned by
task.submit()
so when it’s ran it basically just runs 5x unrelated tasks. I actually don’t need to pass data between them but I do need to set the dependencies and so need to pass the futures into downstream tasks. However, given the above issue (not passing futures) I actually don’t think it’s possible to dynamically generate flows anymore in 2.0. The only thing I can think of is to dynamically write the flow definition to a python file (like with
open(flow.py,w) as f
and
f.write()
and then call / import that python file…?
I modified your above example in testing, this is what I ended up with:
Copy code
my_network_graph = {
  "task_1" : [],
  "task_2" : [],
  "task_3" : ["task_1", "task_2"],
  "task_4" : ["task_3"],
  "task_5" : ["task_4"],
}

from prefect import flow, Flow, task

@task
def foo(task_name):
    print(task_name)


def build_flow(network) -> Flow:
    tasks = {}
    for name in network.keys():
        tasks[name] = foo.with_options(name=name)

    @flow(name="demo_dynamic")
    def my_flow():
        for task_name, task in tasks.items():
            upstream_task_names = network[task_name]
            upstream_tasks = [tasks[u] for u in upstream_task_names]
            task.submit(task_name=task_name, wait_for=upstream_tasks)

    return my_flow


my_flow = build_flow(my_network_graph)

if __name__ == "__main__":
    my_flow()
z

Zanie

10/10/2022, 6:14 PM
Ah my bad yeah you can pass futures as long as your graph doesn’t have references out of order
Copy code
@flow
    def my_flow():
        futures = {}
        for task, upstreams in tasks.items():
            futures[name] = task.submit(wait_for=[futures[name] for name in upstreams])
Otherwise it’s more complicated. Can you explain why you’re trying to define your whole structure before beginning work? Dynamic flows are way simpler in v2, you can literally have a for loop that creates tasks in the flow itself and uses runtime logic. I’m struggling to see the value of this particular form of dynamicism.
d

David Elliott

10/10/2022, 6:25 PM
Yeah for sure - essentially we have an existing static DAG (which is defined in our git repo as a yaml) which needs executing. In Prefect 1.0 we convert that yaml to a prefect flow at CI build time (adding nodes + edges) and register it, then Prefect runs that DAG for us. We’re essentially forced to move to Prefect 2.0 asap as we’re up at 2850 DAG nodes and your upper limit is 3000 (GQL limitations) and so I’m trying to port this existing static-DAG setup over to 2.0 as soon as poss. Like you say, I could try and write something to execute the DAG at runtime (ie walk the network in a loop submitting tasks for execution as & when their upstreams complete) but I was hoping to avoid needing to do that, given we already know the structure of our DAG and just need it to be executed…! In terms of the tasks themselves, they’re purely “run this sql file” tasks, no data needs to be passed between them, just need to maintain the dependencies per our dag file
z

Zanie

10/10/2022, 6:35 PM
Interesting! I've got some high priority work so I can't look into a more complete example right now but we should be able to handle this without too much complexity
🙌 1
d

David Elliott

10/10/2022, 9:08 PM
Thanks! lmk if you have any other thoughts on it 🙂
z

Zanie

10/10/2022, 9:10 PM
I'm also under the impression that we've about doubled the maximum size supported in v1
I'm not sure what limits you're encountering / where, but I recently did some work on Cloud to bump the maximum size of flows.
d

David Elliott

10/12/2022, 10:36 AM
There’s a hard limit, something to do with the GQL backend / api rate limits??, of 3000 nodes in a given flow - initially we hit the 2500 limit and your team upped it to 3k, but said it basically can’t go any higher
z

Zanie

10/12/2022, 4:49 PM
Ah yes sorry I raised it from 2500 to 3000
I think we contemplated bumping it higher, but we are indeed concerned about encountering performance issues after monitoring the earlier bump.
👍 1
3 Views