https://prefect.io logo
#prefect-community
Title
# prefect-community
g

Geoff Coyner

11/23/2022, 1:31 AM
Hi, I'm creating a flow-of-flows and was wondering if anyone had any recommendations for my scenario. Background: My parent flow has potential to create a lot of child flows, with the precise number (N) determined at run-time. To handle this, I've been using mapping. However I've noticed
create_flow_run.map
will create all of flow runs at once, which is clogging up my agent's backlog when N gets too large. This creates a lot of failures, which occur before my child flow runs can even begin. Unfortunately, I am not the admin and don't have control over the agent we're using, so I don't have a way to easily investigate why this is or resolve it. Even if I was, it seems like kicking off this many flows at once probably isn't a good idea. Thus, I'm wondering if there is some way to throttle
create_flow_run
my flow without pre-defining my N child flows. I'm even OK with running all N flows in sequence. I.e. kick off flow run 1 -> wait for flow run 1 -> kick off flow run 2 -> etc. Here is the structure I'm using now:
Copy code
def create_wait_on_child(parameters):
    create_id = create_flow_run(flow_name="GenericChild",
                                project_name="Project",
                                parameters=parameters)
    return wait_for_flow_run(create_id, 
                             raise_final_state=True)

with Flow('Parent Flow') as flow:
    flow_runs_params_list = get_flow_runs_params()
    apply_map(schedule_run_backfill, flow_runs_params_list)
My Question: Is there some way to combine the functionality of`create_flow_run` and
wait_for_flow_run
in the same task? I know I can't just add a
task()
decorator to
create_wait_on_child
in the example above (since that would involve tasks with tasks), but that is the functionality I'm going for. Or does anyone have alternative recommendations? Appreciate anyone's input.
👀 1
Also, we're still on Prefect 1 (in case the
apply_map
wasn't a give-away)
z

Zanie

11/23/2022, 3:46 PM
The older
StartFlowRun
task combines the creation and waiting operations.
I’m not sure of a great way to throttle without concurrency limits from Prefect Cloud.
You could split your
flow_runs_params_list
into batches then map over the batches?
e

Erik Mathis

11/23/2022, 3:56 PM
maybe deque would help here?
g

Geoff Coyner

11/23/2022, 5:59 PM
I got
StartFlowRun
to work. Thanks for the suggestion!
2 Views