Viktor
07/15/2025, 6:56 PMMarvin
07/15/2025, 6:56 PMMarvin
07/15/2025, 6:56 PMDaskTaskRunner
, it will handle the execution of all tasks in that flow.
Here's a basic example to illustrate:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def task1():
return 1
@task
def task2():
return 2
@flow(task_runner=DaskTaskRunner())
def my_flow():
# Both tasks will use DaskTaskRunner
result1 = task1()
result2 = task2()
If you need different execution patterns for different tasks, you have a few alternatives:
1. Create separate flows with different task runners and call them using subflows
2. Use the task runner that meets the needs of your most demanding tasks
3. Implement custom logic within your tasks to handle specific execution requirements
The task runner you choose will be responsible for executing all tasks within its flow context. You can find more information about task runners in the Prefect documentation.Viktor
07/15/2025, 6:59 PMMarvin
07/15/2025, 6:59 PMMarvin
07/15/2025, 7:00 PMpython
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(
name="generate_creatordb",
task_runner=DaskTaskRunner(
cluster_kwargs={
"n_workers": 3, # Instead of using adaptive directly
"threads_per_worker": 4,
"memory_limit": "100GB",
},
adapt_kwargs={ # Adaptive scaling should be in adapt_kwargs, not cluster_kwargs
"minimum": 1,
"maximum": 3,
}
),
)
The key differences from your original syntax are:
1. Adaptive scaling configuration should be in adapt_kwargs
instead of nested inside cluster_kwargs
2. The processes
parameter isn't typically needed as it's a default setting
3. Worker count can be set directly with n_workers
in cluster_kwargs
This configuration will:
- Allow up to 4 threads per worker
- Set a 100GB memory limit per worker
- Enable adaptive scaling between 1 and 3 workers
- Maintain the same basic functionality you were aiming for
If you need to use a specific Dask cluster class or have additional client configuration needs, you can also specify those using cluster_class
and client_kwargs
parameters.
For more complex Dask configurations, you might want to consider creating a DaskTaskRunner configuration as a block, which allows for more detailed configuration options and reusability.Viktor
07/15/2025, 7:01 PMMarvin
07/15/2025, 7:01 PMMarvin
07/15/2025, 7:01 PMpython
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(
name="generate_creatordb",
task_runner=DaskTaskRunner(
cluster_kwargs={
"threads_per_worker": 4,
"memory_limit": "35GB", # Adjusted down
},
adapt_kwargs={
"minimum": 1,
"maximum": 3,
}
),
)
Why these numbers?
- Setting memory_limit="35GB"
per worker means:
- 3 workers × 35GB = 105GB total maximum usage
- Leaves ~23GB for the operating system, Dask scheduler, and other processes
- Provides a safety margin to prevent OOM situations
Some tips:
1. Always leave some memory for:
- Operating system
- Dask scheduler
- Other system processes
- Memory spikes
2. A good rule of thumb is to allocate about 75-80% of total available RAM to your workers
3. Consider setting memory limits based on your actual data processing needs rather than maximum available
If you need to process larger datasets, consider:
1. Processing data in chunks
2. Using disk-based processing methods
3. Reducing the number of concurrent workers
4. Using a larger server or distributed cluster