Sean Talia
01/12/2023, 2:22 PMStartFlowRun
tasks at runtime? My scenario is that I want a flow (call it orchestratorFlow
) to orchestrate the execution of several instances of another flow that I have (call that one ecsFlow
). The issue at hand is, I don't know how many instances of my ecsFlow
I need to kick off until orchestratorFlow
runtime; it could be 1, 8, or 50+ (each flow run of ecsFlow
would be run with a different set of parameters), and would be determined based on a parameter passed to orchestratorFlow
.
I've tried to follow the Prefect v1 map paradigm by doing StartFlowRun.map([<config_parameters>])
, where [<config_parameters>]
is a dynamically generated list of flow run kwargs returned by an upstream task, but that's not doing the trick. Thank you for any help!Matthew Scanlon
01/12/2023, 3:31 PMprefect.tasks.prefect.create_flow_run
?James Sopkin
01/12/2023, 3:37 PMorchestrator flow
iterate through the parameters, kicking off a new child flow for each set of params using create_flow_run
. I can type up some pseudo code if that would helpfrom prefect import Flow, task, Client, unmapped
@task
def generate_params():
# do something here
return [{"num": 1, "letter":"a"}, {"num":2, "letter":"b"}, {"num":3, "letter":"c"}]
with Flow("orchestrator") as flow:
params = generate_params()
Client.create_flow_run.map(flow_id=unmapped("flow_id_here"), params=params)
Sean Talia
01/12/2023, 3:51 PMcreate_flow_run
v. StartFlowRun
, the documentation isn't particularly clarifying...also does the different casing on these tasks signify anything?James Sopkin
01/12/2023, 4:06 PMSean Talia
01/12/2023, 4:07 PMStartFlowRun
is camel case and create_flow_run
is snake caseJames Sopkin
01/12/2023, 4:08 PMSean Talia
01/12/2023, 4:08 PMStartFlowRun
is just kept to preserve backwards compatibility?James Sopkin
01/12/2023, 4:08 PM