Lina M
03/21/2025, 2:35 PMMarvin
03/21/2025, 2:35 PMcreate_flow_run_from_deployment
method to trigger these deployments. This allows you to run them concurrently.
Here's a basic example using Python's asyncio to run multiple sub-flows in parallel:
python
import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run
async def main(num_runs: int):
async with get_client() as client:
flow_runs = [
await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
for _ in range(num_runs)
]
coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
finished_flow_runs = await asyncio.gather(*coros)
print([flow_run.state for flow_run in finished_flow_runs])
if __name__ == "__main__":
asyncio.run(main(num_runs=10))
In this example, replace "my-deployment-id"
with the actual deployment ID of your sub-flows. This script will create and wait for 10 concurrent flow runs.
For more detailed information, you can refer to the Prefect documentation on workflows and deployments and flow runs.Lina M
03/21/2025, 2:42 PMMarvin
03/21/2025, 2:42 PMpython
from prefect import task, flow
from prefect.infrastructure.kubernetes import KubernetesJob
# Define a Kubernetes job block
k8s_job = KubernetesJob(
job_configuration={
"template": {
"spec": {
"containers": [{
"name": "prefect-task",
"image": "your-docker-image",
"resources": {
"requests": {
"cpu": "500m",
"memory": "256Mi"
}
}
}],
"restartPolicy": "Never"
}
}
}
)
@task(infrastructure=k8s_job)
def my_task():
print("Running in a Kubernetes pod")
@flow
def my_flow():
for _ in range(10):
my_task.submit()
if __name__ == "__main__":
my_flow()
In this example, replace "your-docker-image"
with the Docker image you want to use for your tasks. The my_task
function will run in a separate Kubernetes pod for each invocation.
For more detailed guidance, you can refer to the Prefect Kubernetes deployment documentation.Damian Birchler
03/21/2025, 2:46 PMLina M
03/21/2025, 2:48 PMDamian Birchler
03/21/2025, 2:49 PMDamian Birchler
03/21/2025, 2:50 PMDamian Birchler
03/21/2025, 2:52 PMLina M
03/21/2025, 3:00 PMNate
03/21/2025, 3:01 PMLina M
03/21/2025, 3:01 PMDamian Birchler
03/21/2025, 3:01 PMdask
cluster (https://kubernetes.dask.org/en/latest/) that I'd set up previously...Lina M
03/21/2025, 3:05 PMDamian Birchler
03/21/2025, 3:06 PMLina M
03/21/2025, 3:08 PMNate
03/21/2025, 3:16 PM__call__
a @flow
/ @task
decorated function, it will run in the parent process like a normal python function.
if you use run_deployment("workflowelsewhere/onk8s", parameters=...)
you are effectively doing POST /create_flow_run_from_deployment
to spin up work according to whatever infra setup (work pool) you have configured for that deployment you're triggering. for convenience, we track that invoked flow run as a "subflow" of the parent in the prefect UI
finally just for completeness, the .submit
/ .map
interface on @task
decorated functions is just a consistent way to defer concurrency responsibilities somehow (whether to concurrent.futures
, ray or dask) - e.g. the default ThreadPoolTaskRunner task(echo).map(range(3))
runs echo concurently 3 times, with 1 2 and 3 as inputs, each task in its own thread. DaskTaskRunner would run each task in parallel in their own dask processesDamian Birchler
03/21/2025, 3:19 PMDaskTaskRunner
could mean that the tasks would run on a different machine/in a different pod/etc, given the right dask
setup, right?Nate
03/21/2025, 3:20 PMDamian Birchler
03/21/2025, 3:21 PMLina M
03/21/2025, 3:21 PMNate
03/21/2025, 3:24 PM#[6]
from prefect import flow, task
#[7]
@flow
def f(): task(lambda x: x +1).map(range(3)).result()
#[8]
f()
10:23:45.021 | INFO | Flow run 'original-pogona' - Beginning flow run 'original-pogona' for flow 'f'
10:23:45.071 | INFO | Task run '<lambda>-f60' - Finished in state Completed()
10:23:45.073 | INFO | Task run '<lambda>-255' - Finished in state Completed()
10:23:45.074 | INFO | Task run '<lambda>-46d' - Finished in state Completed()
10:23:45.266 | INFO | Flow run 'original-pogona' - Finished in state Completed()
does that answer your question?Lina M
03/21/2025, 3:25 PMNate
03/21/2025, 3:29 PMeach using its own Dask Clustersounds (naively, not knowing the details of your use case) like it could be a source of non-trivial overhead feel free to open a discussion with details if you're so inclined!
Lina M
03/21/2025, 3:30 PM