Malavika S Menon
01/24/2025, 7:21 AMMarvin
01/24/2025, 7:21 AMDaskTaskRunner
can be configured to connect to an existing Dask cluster or to scale adaptively based on the workload. Here's how you can utilize these features:
1. Connecting to an Existing Dask Cluster:
You can configure DaskTaskRunner
to connect to an existing Dask scheduler by specifying the address of the scheduler. This is useful if you have a long-running Dask cluster.
python
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(address="tcp://<scheduler-address>"))
def my_flow():
# Your tasks here
pass
2. Adaptive Scaling:
The DaskTaskRunner
can also scale adaptively, allowing you to specify a range for the number of workers. This can be done using adapt_kwargs
.
python
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(adapt_kwargs={"minimum": 1, "maximum": 10}))
def my_adaptive_flow():
# Your tasks here
pass
These examples show how you can leverage Dask's capabilities to manage resources effectively when running Prefect flows.
For more details, you can refer to the Prefect 2.x Task Runners documentation and the Prefect Dask Task Runners documentation.Malavika S Menon
01/24/2025, 7:23 AMMarvin
01/24/2025, 7:23 AMdask.annotate
with Prefect, you can integrate it into your flows to specify task priorities or resource requirements. Here's how you can do it:
1. Priority Annotations:
You can use dask.annotate
to set task priorities. Higher priority tasks are scheduled before lower priority ones.
python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
@flow(task_runner=DaskTaskRunner())
def my_flow():
with dask.annotate(priority=-10):
show(1) # low priority task
with dask.annotate(priority=10):
show(2) # high priority task
2. Resource Annotations:
You can also specify resources required by tasks using dask.annotate
.
python
import dask
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def show(x):
print(x)
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)
def my_flow():
with dask.annotate(resources={'GPU': 1}):
show(0) # this task requires 1 GPU resource
with dask.annotate(resources={'process': 1}):
show(1)
show(2)
show(3) # these tasks require 1 process each
These annotations help manage and allocate resources efficiently within a Dask cluster when running Prefect flows. For further details, refer to the Prefect Dask Usage Guide.