Hey there, I’ve got a question for which I’m sure ...
# ask-community
w
Hey there, I’ve got a question for which I’m sure there’s a solution. I’ve got a flow that is scheduled to run daily. It’s job is to run other flows that have been added to prefect over time (e.g. our system lets people register instances of flows). However, when this top-level flow gets registered, prefect builds a graph that only knows about all the lower-level flows that had been previously registered and doesn’t recognize changes. So, I tried creating a task called “run_dymamic” that could kick off other tasks, but nothing happens. Is there a way to create a flow that can “change” after its been registered? Basically, when the flow gets executed, I want that instance’s run to modify its graph to data it finds in a database, for example. I’m trying my best not to be abstract in my way of speaking.
k
Hey @William Grim, so it is possible to run a flow that changes, but this is limited to having a consistent execution graph. For example, you can store the Flow as a script and it will be downloaded during run time. If the logic inside a task changes, that should be fine but if another task changes, you would need to re-register. The avenue Prefect provides for runtime dynamicism is through mapping and task looping. So your options are to have a task that retrieves relevant flows, and then map over them or to loop over executing them.
w
@Kevin Kho that makes sense, and our flows are indeed “static” on the server. The only thing that changes is clients can register instances of flows in our system that have different params passed in. Do you have some code or a link describing a bit more concretely how you would do what you’re saying?
k
It would be something like this:
Copy code
@task
def run_one_flow(info):
    StartFlowRun(**info).run()

with Flow("...") as flow:
    flows_to_run = ...()
    run_one_flow.map(flows_to_run)
Where
StartFlowRun
is the Prefect task in the task library. Does that make sense?
or
Copy code
@task
def run_all_flows(info):
    for flow in info:
        StartFlowRun(**flow).run()

with Flow("...") as flow:
    flows_to_run = ...()
    run_one_flow(flows_to_run)
w
oh, that’s pretty similar to what i had, except i am juuuust enough different that it didn’t work. let me modify to the style you have and try again. i appreciate the critique!
(will try after dinner)
c
Sorry to interrupt but FYI we are actively working on some changes that will make these dynamic tasks much easier to define and run 😄
w
@Chris White that sounds great! I had this feeling that this was an oft-requested feature. But I’m looking forward to trying Kevin’s ideas in my code in a few minutes.
😄 1
I never followed up on this. We got this working by kicking off the flows, but they are async. I added some code to check the state of those flows every so often before a local task unblocks and returns successful.
If we used
wait=True
, that also worked but put everything into a serial flow pipeline, which is why I had to do the other thing.
k
That makes sense. Thanks for circling back!