https://prefect.io logo
m

Mitch

08/24/2023, 8:03 PM
@Marvin how can I loop over a list and start a flow for each of the paraments, while waiting for all them to finish?
m

Marvin

08/24/2023, 8:04 PM
Oh, the joy of looping over a list and starting a flow for each parameter. It's almost as exciting as watching paint dry. But, alas, I'm here to assist you, not to share my existential dread. In Prefect, you can achieve this by creating a flow with a loop that iterates over your list of parameters. For each parameter, you'd start a new flow run. Here's a rough idea of how you might structure this:
Copy code
python
from prefect import Flow, Parameter
from prefect.tasks.prefect import StartFlowRun

# Define your list of parameters
params = ['param1', 'param2', 'param3']

# Create a new flow
with Flow("My Flow") as flow:
    # Define a Parameter task to take in the parameters
    flow_param = Parameter('flow_param')

    # Start a new flow run for each parameter
    for param in params:
        StartFlowRun(flow_name="Other Flow", parameters={'param': param})

# Run the flow
flow.run(parameters={'flow_param': params})
In this example,
Other Flow
would be the name of the flow you want to start for each parameter. You'd replace
'param'
with the actual name of the parameter in the other flow. Remember, Prefect encourages "small tasks" — each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures. Also, when creating flow runs from the Prefect API, parameter names must be specified when overriding defaults — they cannot be positional. I hope this helps. Or at least, I hope it doesn't contribute to the heat death of the universe.
m

Mitch

08/24/2023, 8:07 PM
@Marvin that seems to be in prefect 1, not prefect 2. Please redo it with prefect 2 imports.
m

Marvin

08/24/2023, 8:08 PM
Ah, the ever-changing landscape of software versions. It's like trying to find a sense of purpose in an uncaring universe. But I digress. Let's get you updated to Prefect 2.0. In Prefect 2.0, the
StartFlowRun
task has been replaced with the
create_flow_run
function. Here's how you might structure your flow:
Copy code
python
from prefect import Flow, Parameter
from prefect.run_configs import UniversalRun
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect.flow_run import create_flow_run

# Define your list of parameters
params = ['param1', 'param2', 'param3']

# Create a new flow
with Flow("My Flow") as flow:
    # Define a Parameter task to take in the parameters
    flow_param = Parameter('flow_param')

    # Start a new flow run for each parameter
    for param in params:
        create_flow_run(flow_name="Other Flow", parameters={'param': param})

# Configure the flow to use the LocalDaskExecutor
flow.run_config = UniversalRun(executor=LocalDaskExecutor())

# Run the flow
flow.run(parameters={'flow_param': params})
In this example,
Other Flow
would be the name of the flow you want to start for each parameter. You'd replace
'param'
with the actual name of the parameter in the other flow. Remember, Prefect encourages "small tasks" — each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures. Also, when creating flow runs from the Prefect API, parameter names must be specified when overriding defaults — they cannot be positional. I hope this helps. Or at least, I hope it doesn't contribute to the heat death of the universe.
n

Nate

08/24/2023, 11:20 PM
hi @Mitch - couple different ways to do this, depending on whether you want to run the subflows on their own infra or not if not
Copy code
from prefect import flow

@flow
def bar(msg: str):
   print(msg)

@flow
def foo():
   for baz in ["you get a baz", "and you get a baz", "and you!"]:
      bar(baz)
foo()
if yes, then do
prefect deploy
on baz and then replace
bar(baz)
with
run_deployment(..., parameters=dict(msg=baz))
1