<@ULVA73B9P> can multiple tasks from the same `flo...
# marvin-in-the-wild
m
@Marvin can multiple tasks from the same
flow
be scheduled on multiple workers to increase paralleism?
m
Yes, multiple tasks from the same flow can be scheduled on multiple workers to increase parallelism. Prefect employs different methods to achieve this, notably through integrations like Dask. Prefect Dask can help in achieving parallelism: - Temporary Local Cluster: Prefect can create a temporary local Dask cluster, automatically using the available cores. - Custom Worker Configuration: You can specify the number of workers or threads per worker using
DaskTaskRunner
. Here's a brief example using Dask to achieve this:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
def compute_task():
    with get_dask_client() as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}))
def dask_flow():
    compute_task()
The detailed documentation for Dask integration with Prefect is available here. ##### Global Concurrency Limits You can also manage the number of tasks running in parallel using concurrency limits. For more information on global concurrency limits, refer to the Prefect documentation on Global Concurrency Limits. If you have any specific configurations or additional queries, feel free to ask!
m
What if I don't want to use Dask nor Ray? @Marvin
m
If you prefer not to use Dask or Ray, you can still achieve task parallelism with Prefect using its built-in capabilities and other task runners. Below are a few strategies: 1. Prefect's ConcurrentExecution Task Runner Prefect supports concurrent task execution natively without requiring external libraries like Dask or Ray. This can be achieved by using
ConcurrentTaskRunner
which might be your best option. To configure it, you can do something like this:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def my_task(x):
    return x * 2

@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
    results = [my_task.submit(i) for i in range(10)]
    return results

if __name__ == "__main__":
    my_flow()
2. Sub-flows for Parallel Execution You can break your flow into sub-flows to run parts of your workflow in parallel. Each sub-flow can be independently executed and managed.
Copy code
python
from prefect import flow, task

@task
def task_a():
    return "result_a"

@task
def task_b():
    return "result_b"

@flow
def sub_flow():
    result_a = task_a()
    result_b = task_b()
    return result_a, result_b

@flow
def main_flow():
    futures = [sub_flow.submit() for _ in range(5)]
    return futures

if __name__ == "__main__":
    main_flow()
3. Using Concurrency Context Manager Prefect provides a concurrency management system allowing you to control the maximum number of concurrent tasks.
Copy code
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def process_data(data):
    print(f"Processing: {data}")
    return f"Processed: {data}"

@flow
def main():
    data_items = list(range(10))
    processed_data = []
    while data_items:
        with concurrency("data-processing", occupy=3):
            chunk = [data_items.pop() for _ in range(3)]
            processed_data += [process_data.submit(item) for item in chunk]

if __name__ == "__main__":
    main()
You can get more insights on Prefect's task runners from the documentation here. For further assistance, checking out the documentation on concurrency limits and task runners would be beneficial. If you have any more questions or specifics, please let me know!
m
@Marvin How do Task Runners connect to multiple workers within a single work pool?
m
Task Runners in Prefect work within a system where Work Pools and Workers bridge the Prefect orchestration environment with your execution infrastructure. Here’s how they connect to multiple workers within a single work pool: ### Conceptual Workflow 1. Work Pools manage the organization and scheduling of work. 2. Workers poll the work pool for new runs or receive work directly from the work pool. 3. Task Runners can distribute tasks to these workers, leveraging their resources for parallel execution. ### Execution Flow 1. Work Pool Overview: - Work pools have specific types corresponding to the infrastructure, and they manage how work is delivered. - Work pools can either be pull (where workers poll for work) or push (submitting work to serverless infrastructure like Google Cloud Run, Azure Container Instances, etc.) 2. Worker Interaction: - Workers independently poll the work pool for new tasks. - Several workers can be connected to a single work pool, allowing for scaling and parallel execution of tasks from the same flow. ### Example: Setting Up Work Pools and Workers Here’s a conceptual overview outlined in Prefect:
Copy code
python
from prefect import flow, task

@task
def my_task():
    return "Task completed."

@flow
def my_flow():
    futures = [my_task.submit() for _ in range(10)]
    return futures

if __name__ == "__main__":
    my_flow()
- Flow Creation: Define your flow and tasks. - Work Pools Configuration: Create and configure your work pool to match the infrastructure. - Workers Deployment: Deploy multiple workers, either locally or on the cloud, to poll the work pool. ### Tasks in Parallel When submitting tasks to the flow, the workers in the pool will independently pick up these tasks and execute them in parallel. Prefect ensures that the infrastructure is utilized optimally based on the available workers. ### Relevant Documentation - Task Runners - Work Pools This setup provides a flexible and efficient way to manage and execute tasks across multiple workers, increasing parallelism and optimizing resource utilization.
n
hey @marrrcin have you checked this repo out?
in particular, you can horizontally scale task worker replicas arbitrarily without worrying about race conditions in scheduled task runs getting picked up, which is not quite possible with workers that listen for scheduled deployments today
m
@Nate I just want a simple thing...
Copy code
@task()
def machine_learning_pipeline(i):
    # my complex ML pipeline
    return i


@flow()
def my_flow():
    results = []
    for i in range(10):

        # I WANT EACH machine_learning_pipeline TO RUN ON A SEPARATE WORKER

        results.append(
            machine_learning_pipeline.submit(i)
        )  
    print(results)
n
when you say separate worker, how do you mean that in terms of infra? ie • thread within the same process • new process • new machine
m
Locally I would like to be separate ephemeral container (if I use docker work pool); In k8s I would like to be separate k8s pod/job
n
so I think you may want
run_deployment
where you have your complex ml pipeline as a flow that you deploy, maybe twice • one to
my-local-docker-work-pool
• one to
prd-kubernetes-work-pool
and then you have a flow that does
Copy code
@task()
def machine_learning_pipeline(i):
    # my complex ML pipeline
    return i


@flow()
async def my_flow():
    list_of_params = [{i: ...}] # etc
    flow_runs = await asyncio.gather(
        *[
             run_deployment("machine_learning_flow/deployment", parameters=p) for p in list_of_params
        ]
    )
          
    print(flow_runs)
https://github.com/PrefectHQ/prefect-recipes/tree/main/flows-advanced/parent-orchestrator
m
Or maybe in other words: What I expected is: if I have 2 workers and I schedule some tasks that can be easily run in parallel, I expect ALL THE WORKERS to run at full capacity, executing everything that can be executed in parallel, making the best use of the resources I gave to the Prefect
n
classical workers operate on flow runs, not the tasks within which is why I shared the background tasks repo, where you can distribute task runs across horizontally scaled task workers
m
So if I understand correctly - the solution here would be to have 2 flows: 1. The "orchestrating flow" - that schedules all other flows 2. The "executing flow" - that actually runs the tasks
n
yeah thats one way of going about it basically
run_deployment
is going to schedule some work on distinct infra from the caller, which sounds like roughly what you want task runners (ray / dask) or background tasks would be alternate ways to achieve that
1
m
Thanks for the clarification @Nate, it helps! 👏
👍 1