Pooyan
11/22/2023, 7:54 AMMarvin
11/22/2023, 7:55 AMfrom prefect import Flow, Parameter
import asyncio
# Define your subflows
@task
def subflow_1(param):
# Your code here
pass
@task
def subflow_2(param):
# Your code here
pass
# Define your main flow
@flow
def main_flow():
param_1 = Parameter("param_1")
param_2 = Parameter("param_2")
parallel_subflows = [subflow_1(param_1), subflow_2(param_2)]
gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
In this example, subflow_1
and subflow_2
are run in parallel with different parameters param_1
and param_2
.
You can also provide default parameter values in the deployment.yaml
configuration, and these parameter values will be used for flow runs based on the deployment. To configure default parameter values, add them to the parameters: {}
line of deployment.yaml
as JSON key-value pairs. The parameter list configured in deployment.yaml
must match the parameters expected by the entrypoint flow function. For example:
yaml
parameters: {"name": "Marvin", "num": 42}
You can find more information in the Prefect documentation and on the Prefect Discourse.Pooyan
11/22/2023, 7:58 AMrun_namespaced_job
from prefect_kubernetes.flows
in each of the parallel tasks. It seems run_namespaced_job
is a flow so I cant run it under a taskMarvin
11/22/2023, 7:58 AMrun_namespaced_job
is a flow, you can call it as a subflow within your main flow.
Here is an example of how you can do this:
from prefect import Flow, Parameter
import asyncio
# Define your subflows
@flow
def run_namespaced_job(param):
# Your code here
pass
# Define your main flow
@flow
def main_flow():
param = Parameter("param")
parallel_subflows = [run_namespaced_job(param) for _ in range(10)]
gather(*parallel_subflows)
if __name__ == "__main__":
main_flow_state = asyncio.run(main_flow())
In this example, run_namespaced_job
is run 10 times in parallel with the same parameter param
.
You can find more information in the Prefect documentation and on the Prefect Discourse.