marrrcin
08/27/2024, 11:52 AMflow
be scheduled on multiple workers to increase paralleism?Marvin
08/27/2024, 11:52 AMDaskTaskRunner
.
Here's a brief example using Dask to achieve this:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def compute_task():
with get_dask_client() as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = df.describe().compute()
return summary_df
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}))
def dask_flow():
compute_task()
The detailed documentation for Dask integration with Prefect is available here.
##### Global Concurrency Limits
You can also manage the number of tasks running in parallel using concurrency limits.
For more information on global concurrency limits, refer to the Prefect documentation on Global Concurrency Limits.
If you have any specific configurations or additional queries, feel free to ask!marrrcin
08/27/2024, 1:23 PMMarvin
08/27/2024, 1:25 PMConcurrentTaskRunner
which might be your best option.
To configure it, you can do something like this:
python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task(x):
return x * 2
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
results = [my_task.submit(i) for i in range(10)]
return results
if __name__ == "__main__":
my_flow()
2. Sub-flows for Parallel Execution
You can break your flow into sub-flows to run parts of your workflow in parallel. Each sub-flow can be independently executed and managed.
python
from prefect import flow, task
@task
def task_a():
return "result_a"
@task
def task_b():
return "result_b"
@flow
def sub_flow():
result_a = task_a()
result_b = task_b()
return result_a, result_b
@flow
def main_flow():
futures = [sub_flow.submit() for _ in range(5)]
return futures
if __name__ == "__main__":
main_flow()
3. Using Concurrency Context Manager
Prefect provides a concurrency management system allowing you to control the maximum number of concurrent tasks.
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(data):
print(f"Processing: {data}")
return f"Processed: {data}"
@flow
def main():
data_items = list(range(10))
processed_data = []
while data_items:
with concurrency("data-processing", occupy=3):
chunk = [data_items.pop() for _ in range(3)]
processed_data += [process_data.submit(item) for item in chunk]
if __name__ == "__main__":
main()
You can get more insights on Prefect's task runners from the documentation here.
For further assistance, checking out the documentation on concurrency limits and task runners would be beneficial. If you have any more questions or specifics, please let me know!marrrcin
08/27/2024, 1:26 PMMarvin
08/27/2024, 1:27 PMpython
from prefect import flow, task
@task
def my_task():
return "Task completed."
@flow
def my_flow():
futures = [my_task.submit() for _ in range(10)]
return futures
if __name__ == "__main__":
my_flow()
- Flow Creation: Define your flow and tasks.
- Work Pools Configuration: Create and configure your work pool to match the infrastructure.
- Workers Deployment: Deploy multiple workers, either locally or on the cloud, to poll the work pool.
### Tasks in Parallel
When submitting tasks to the flow, the workers in the pool will independently pick up these tasks and execute them in parallel. Prefect ensures that the infrastructure is utilized optimally based on the available workers.
### Relevant Documentation
- Task Runners
- Work Pools
This setup provides a flexible and efficient way to manage and execute tasks across multiple workers, increasing parallelism and optimizing resource utilization.Nate
08/27/2024, 2:16 PMNate
08/27/2024, 2:18 PMmarrrcin
08/27/2024, 2:28 PM@task()
def machine_learning_pipeline(i):
# my complex ML pipeline
return i
@flow()
def my_flow():
results = []
for i in range(10):
# I WANT EACH machine_learning_pipeline TO RUN ON A SEPARATE WORKER
results.append(
machine_learning_pipeline.submit(i)
)
print(results)
Nate
08/27/2024, 2:28 PMmarrrcin
08/27/2024, 2:29 PMNate
08/27/2024, 2:34 PMrun_deployment
where you have your complex ml pipeline as a flow that you deploy, maybe twice
• one to my-local-docker-work-pool
• one to prd-kubernetes-work-pool
and then you have a flow that does
@task()
def machine_learning_pipeline(i):
# my complex ML pipeline
return i
@flow()
async def my_flow():
list_of_params = [{i: ...}] # etc
flow_runs = await asyncio.gather(
*[
run_deployment("machine_learning_flow/deployment", parameters=p) for p in list_of_params
]
)
print(flow_runs)
https://github.com/PrefectHQ/prefect-recipes/tree/main/flows-advanced/parent-orchestratormarrrcin
08/27/2024, 2:34 PMNate
08/27/2024, 2:36 PMmarrrcin
08/27/2024, 2:37 PMNate
08/27/2024, 2:38 PMrun_deployment
is going to schedule some work on distinct infra from the caller, which sounds like roughly what you want
task runners (ray / dask) or background tasks would be alternate ways to achieve thatmarrrcin
08/27/2024, 2:39 PM