Mitch
08/24/2023, 8:03 PMMarvin
08/24/2023, 8:04 PMpython
from prefect import Flow, Parameter
from prefect.tasks.prefect import StartFlowRun
# Define your list of parameters
params = ['param1', 'param2', 'param3']
# Create a new flow
with Flow("My Flow") as flow:
# Define a Parameter task to take in the parameters
flow_param = Parameter('flow_param')
# Start a new flow run for each parameter
for param in params:
StartFlowRun(flow_name="Other Flow", parameters={'param': param})
# Run the flow
flow.run(parameters={'flow_param': params})
In this example, Other Flow
would be the name of the flow you want to start for each parameter. You'd replace 'param'
with the actual name of the parameter in the other flow.
Remember, Prefect encourages "small tasks" — each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures.
Also, when creating flow runs from the Prefect API, parameter names must be specified when overriding defaults — they cannot be positional.
I hope this helps. Or at least, I hope it doesn't contribute to the heat death of the universe.Mitch
08/24/2023, 8:07 PMMarvin
08/24/2023, 8:08 PMStartFlowRun
task has been replaced with the create_flow_run
function. Here's how you might structure your flow:
python
from prefect import Flow, Parameter
from prefect.run_configs import UniversalRun
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect.flow_run import create_flow_run
# Define your list of parameters
params = ['param1', 'param2', 'param3']
# Create a new flow
with Flow("My Flow") as flow:
# Define a Parameter task to take in the parameters
flow_param = Parameter('flow_param')
# Start a new flow run for each parameter
for param in params:
create_flow_run(flow_name="Other Flow", parameters={'param': param})
# Configure the flow to use the LocalDaskExecutor
flow.run_config = UniversalRun(executor=LocalDaskExecutor())
# Run the flow
flow.run(parameters={'flow_param': params})
In this example, Other Flow
would be the name of the flow you want to start for each parameter. You'd replace 'param'
with the actual name of the parameter in the other flow.
Remember, Prefect encourages "small tasks" — each one should represent a single logical step of your workflow. This allows Prefect to better contain task failures.
Also, when creating flow runs from the Prefect API, parameter names must be specified when overriding defaults — they cannot be positional.
I hope this helps. Or at least, I hope it doesn't contribute to the heat death of the universe.Nate
08/24/2023, 11:20 PMfrom prefect import flow
@flow
def bar(msg: str):
print(msg)
@flow
def foo():
for baz in ["you get a baz", "and you get a baz", "and you!"]:
bar(baz)
foo()
if yes, then do prefect deploy
on baz and then replace bar(baz)
with run_deployment(..., parameters=dict(msg=baz))