<@ULVA73B9P> I want to run sub-flows on separate p...
# ask-marvin
l
@Marvin I want to run sub-flows on separate pods to run in parallel, can you give me an examples ?
m
To run sub-flows on separate pods in parallel using Prefect 3.x, you can utilize deployments. This allows each sub-flow to be executed independently, often in separate infrastructure like pods. Here's a concise example to illustrate this concept: 1. Create Deployments: First, you need to create deployments for your sub-flows. Each deployment will specify the infrastructure, such as a Kubernetes pod, where the flow should run. 2. Trigger Sub-Flows: Use the
create_flow_run_from_deployment
method to trigger these deployments. This allows you to run them concurrently. Here's a basic example using Python's asyncio to run multiple sub-flows in parallel:
Copy code
python
import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run

async def main(num_runs: int):
    async with get_client() as client:
        flow_runs = [
            await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
            for _ in range(num_runs)
        ]
        coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
        finished_flow_runs = await asyncio.gather(*coros)
        print([flow_run.state for flow_run in finished_flow_runs])

if __name__ == "__main__":
    asyncio.run(main(num_runs=10))
In this example, replace
"my-deployment-id"
with the actual deployment ID of your sub-flows. This script will create and wait for 10 concurrent flow runs. For more detailed information, you can refer to the Prefect documentation on workflows and deployments and flow runs.
l
@Marvin How can we create a pod for each task?
m
To create a pod for each task in Prefect using Kubernetes, you can use the KubernetesJob infrastructure block. This allows each task to run in its own Kubernetes pod, providing isolation and scalability. Here’s a basic outline of how to set this up: 1. Define a Kubernetes Job Block: Configure a Kubernetes job block that specifies the pod template for each task. This includes resource requests, environment variables, and other Kubernetes-specific configurations. 2. Assign Tasks to Use the Kubernetes Infrastructure: When defining tasks, specify the Kubernetes infrastructure block to ensure each task runs in its own pod. Here's a conceptual example to help you get started:
Copy code
python
from prefect import task, flow
from prefect.infrastructure.kubernetes import KubernetesJob

# Define a Kubernetes job block
k8s_job = KubernetesJob(
    job_configuration={
        "template": {
            "spec": {
                "containers": [{
                    "name": "prefect-task",
                    "image": "your-docker-image",
                    "resources": {
                        "requests": {
                            "cpu": "500m",
                            "memory": "256Mi"
                        }
                    }
                }],
                "restartPolicy": "Never"
            }
        }
    }
)

@task(infrastructure=k8s_job)
def my_task():
    print("Running in a Kubernetes pod")

@flow
def my_flow():
    for _ in range(10):
        my_task.submit()

if __name__ == "__main__":
    my_flow()
In this example, replace
"your-docker-image"
with the Docker image you want to use for your tasks. The
my_task
function will run in a separate Kubernetes pod for each invocation. For more detailed guidance, you can refer to the Prefect Kubernetes deployment documentation.
d
Forgive me for interrupting, but I think the "modern" way (?) is Kubernetes backed work pools (I was also just looking into this, so I took the liberty to chime in).
l
Hi Damian, Thanks for your input! Is this in relation of pod-per-subflow or pod-per-task question? And can you please provide with more details?
d
pod per subflow. per my understanding, for pod per task you'd have to go through dask or ray and have them schedule stuff on kubernetes
which is why my head's spinning, it's like "cloud"-ception 😄
I searched the docs for KubernetesJob because I've not seen that before and was intrigued, but I only found sth about migrating that to work pools which I have read about. That's why I thought work KubernetesJob was outdated
l
Would the Dask Task Runner Actually spawns new pods? Because in the documentation, they said that Prefect creates a local cluster (which I think means within the pod)
n
> Would the Dask Task Runner Actually spawns new pods? no. task runs (regardless of task runner) are run in-process in the parent flow. you can define separate deployments (like this example) to dispatch work on other infra from some parent context > which I think means within the pod correct https://github.com/PrefectHQ/prefect-recipes/blob/main/flows-advanced/parent-orchestrator/pokemon_weight.py
👍 1
l
And I totally get you, my head is literally spinning. I finally got my self-hosted Dask clusters to run in parallel within the same flow, but at writing time, the pod becomes a bottleneck :')
d
i think it creates a local cluster per-default, but I thought to connect it to a Kubernetes-backed
dask
cluster (https://kubernetes.dask.org/en/latest/) that I'd set up previously...
l
Yes, @Damian Birchler. That would work pretty well. Dask will create its own workers (pods) and it will complete the task correctly. In my case, I want to increase this degree of parallelization and use multiple dask clusters at once
d
couldn't you just add more nodes to the cluster, instead of having more clusters?
l
Thanks for your answer, @Nate! I see what I can do now. I think that would work for me. Is this the same as subflows? I mean, would a subflow also uses the parent flow infra?
n
> Is this the same as subflows? I mean, would a subflow also uses the parent flow infra? if you just
__call__
a
@flow
/
@task
decorated function, it will run in the parent process like a normal python function. if you use
run_deployment("workflowelsewhere/onk8s", parameters=...)
you are effectively doing
POST /create_flow_run_from_deployment
to spin up work according to whatever infra setup (work pool) you have configured for that deployment you're triggering. for convenience, we track that invoked flow run as a "subflow" of the parent in the prefect UI finally just for completeness, the
.submit
/
.map
interface on
@task
decorated functions is just a consistent way to defer concurrency responsibilities somehow (whether to
concurrent.futures
, ray or dask) - e.g. the default ThreadPoolTaskRunner
task(echo).map(range(3))
runs echo concurently 3 times, with 1 2 and 3 as inputs, each task in its own thread. DaskTaskRunner would run each task in parallel in their own dask processes
d
Just to double check, using
DaskTaskRunner
could mean that the tasks would run on a different machine/in a different pod/etc, given the right
dask
setup, right?
n
i personally haven't tried the dask kubernetes thing you alluded to, but DaskTaskRunner should allow full passthrough config via cluster_kwargs so if you can configure dask to spin up new things as separate pods, then we should allow that by default, they're separate processes on the ephemeral cluster we spin up
d
great, thanks!
l
I feel like this question might be a bit off, but what would be the difference here (using Dask Task Runner) between the threads and the processes?
n
when you use DaskTaskRunner, you're having dask spin up separate processes (however exactly dask does this) for each task run you submit, and not using threads. using threads to do task runs concurrently is the default experience if you do this (ie no task runner config -> ThreadPoolTaskRunner)
Copy code
#[6]
from prefect import flow, task

#[7]
@flow
def f(): task(lambda x: x +1).map(range(3)).result()

#[8]
f()
10:23:45.021 | INFO    | Flow run 'original-pogona' - Beginning flow run 'original-pogona' for flow 'f'
10:23:45.071 | INFO    | Task run '<lambda>-f60' - Finished in state Completed()
10:23:45.073 | INFO    | Task run '<lambda>-255' - Finished in state Completed()
10:23:45.074 | INFO    | Task run '<lambda>-46d' - Finished in state Completed()
10:23:45.266 | INFO    | Flow run 'original-pogona' - Finished in state Completed()
does that answer your question?
l
A bit. Now I am using threads, and at a certain point the tasks (each using its own Dask Cluster ) are getting sequential due to the I/O operation. I would think the processes here would help with this, what do you think?
n
my bias is that most things people are doing in python are IO bound and they don't need true parallelism (concurrency is good enough). async python + threads + good design is usually sufficient. there are certainly cases where this is not true. this
each using its own Dask Cluster
sounds (naively, not knowing the details of your use case) like it could be a source of non-trivial overhead feel free to open a discussion with details if you're so inclined!
👍 1
l
I see what you mean. Concurrency did help (reduced 50% of execution time), but I am aiming for more. I will keep what you said in mind. Thanks a lot for your answer, @Nate! You too, @Damian Birchler
catjam 1