<@ULVA73B9P> how can I run a flow in parallel with...
# ask-marvin
p
@Marvin how can I run a flow in parallel with different parameters?
m
To run a flow in parallel with different parameters, you can use subflows. Subflows are just flows that are called from within a flow. You can start multiple subflows in parallel and each subflow can have its own set of parameters. Here is an example of how you can do this:
Copy code
from prefect import Flow, Parameter
import asyncio

# Define your subflows
@task
def subflow_1(param):
    # Your code here
    pass

@task
def subflow_2(param):
    # Your code here
    pass

# Define your main flow
@flow
def main_flow():
    param_1 = Parameter("param_1")
    param_2 = Parameter("param_2")

    parallel_subflows = [subflow_1(param_1), subflow_2(param_2)]
    gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
In this example,
subflow_1
and
subflow_2
are run in parallel with different parameters
param_1
and
param_2
. You can also provide default parameter values in the
deployment.yaml
configuration, and these parameter values will be used for flow runs based on the deployment. To configure default parameter values, add them to the
parameters: {}
line of
deployment.yaml
as JSON key-value pairs. The parameter list configured in
deployment.yaml
must match the parameters expected by the entrypoint flow function. For example:
Copy code
yaml
parameters: {"name": "Marvin", "num": 42}
You can find more information in the Prefect documentation and on the Prefect Discourse.
p
@Marvin I am looking to run
run_namespaced_job
from
prefect_kubernetes.flows
in each of the parallel tasks. It seems
run_namespaced_job
is a flow so I cant run it under a task
m
In Prefect, you can call tasks or subflows to do work within your workflow, including passing results from other tasks to your subflow. If
run_namespaced_job
is a flow, you can call it as a subflow within your main flow. Here is an example of how you can do this:
Copy code
from prefect import Flow, Parameter
import asyncio

# Define your subflows
@flow
def run_namespaced_job(param):
    # Your code here
    pass

# Define your main flow
@flow
def main_flow():
    param = Parameter("param")

    parallel_subflows = [run_namespaced_job(param) for _ in range(10)]
    gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
In this example,
run_namespaced_job
is run 10 times in parallel with the same parameter
param
. You can find more information in the Prefect documentation and on the Prefect Discourse.