Alec Thomson
07/24/2024, 8:34 AMMarvin
07/24/2024, 8:34 AMbash
prefect work-pool create --type <infrastructure-type> my-dask-work-pool
Replace <infrastructure-type>
with the type of infrastructure that can execute runs from this work pool.
2. Configure the Dask Task Runner:
To configure the Dask task runner, pass the address of the Dask scheduler to the DaskTaskRunner
. You can also use adaptive scaling by specifying adapt_kwargs
.
Here’s an example of configuring the Dask task runner within a flow:
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def my_task():
print("Executing task...")
@flow(task_runner=DaskTaskRunner(address="<tcp://scheduler-address:8786>"))
def my_flow():
my_task()
my_flow()
3. Run the Work Pool:
Start a worker that connects to the work pool:
bash
prefect worker start -p my-dask-work-pool
For more detailed information, refer to the relevant Prefect documentation:
- Work Pools: https://docs.prefect.io/latest/concepts/work-pools/
- Task Runners: https://docs.prefect.io/latest/concepts/task-runners/
You can also check the issues related to Dask on GitHub for common problems and solutions:
- Flow cancellation not deleting Dask Scheduler & Worker pods
- Adaptive Dask Executor workers not scaling up
- Purge old Dask futuresAlec Thomson
07/24/2024, 8:36 AMMarvin
07/24/2024, 8:36 AMbash
prefect work-pool create --type custom my-hpc-dask-pool
3. Configuring the Dask Task Runner:
Ensure the Dask task runner connects to the scheduler address of your HPC cluster.
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def my_task():
print("Executing task...")
@flow(task_runner=DaskTaskRunner(address="<tcp://scheduler-address:8786>"))
def my_flow():
my_task()
my_flow()
4. Starting the Worker:
Start a worker that connects to the HPC cluster:
bash
prefect worker start -p my-hpc-dask-pool
This setup enables your self-hosted Prefect UI to manage tasks executed on your HPC cluster.
For further details, refer to:
- Work Pools: Prefect Work Pools
- Hosting Prefect: Host a Prefect ServerAlec Thomson
07/24/2024, 8:37 AMMarvin
07/24/2024, 8:38 AMpython
from prefect.deployments import Deployment
from your_flow_module import my_flow # Import your flow
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-flow-deployment",
work_pool_name="my-hpc-dask-pool" # Use the name of your work pool
)
deployment.apply()
2. Run the Worker:
Start a worker that connects to the work pool:
bash
prefect worker start -p my-hpc-dask-pool
3. Trigger the Flow Run:
You can trigger a flow run from the Prefect UI or the CLI. To use the CLI:
bash
prefect deployment run my-flow-deployment
This setup ensures that your flow is deployed to the specified work pool and can be executed by the worker.
For additional guidance on flow deployments and using workers, refer to the relevant Prefect tutorials:
- Workers: https://docs.prefect.io/latest/tutorial/workers/
- Deployments: https://docs.prefect.io/latest/concepts/deployments/
The Prefect async workflow example can provide additional context on deploying and running flows.Alec Thomson
07/24/2024, 8:39 AMUnknown work pool type 'custom'. Please choose from azure-container-instance, block, cloud-run, cloud-run-v2,
docker, ecs, kubernetes, prefect-agent, process, vertex-ai.
Marvin
07/24/2024, 8:39 AMcustom
work pool type isn't recognized by Prefect. Based on your setup running Prefect UI on a self-hosted VM and deploying Dask on an HPC cluster, you might want to use the process
work pool type for executing on local infrastructure or the kubernetes
type if you're using Kubernetes for your Dask cluster management.
Here's how to proceed:
1. Create a Work Pool:
If you're using local infrastructure, use the process
type:
bash
prefect work-pool create --type process my-hpc-dask-pool
If you're using Kubernetes, use the kubernetes
type:
bash
prefect work-pool create --type kubernetes my-hpc-dask-pool
2. Deploy Your Flow:
Create a deployment for your flow specifying the correct work pool:
python
from prefect.deployments import Deployment
from your_flow_module import my_flow # Import your flow
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-flow-deployment",
work_pool_name="my-hpc-dask-pool"
)
deployment.apply()
3. Run the Worker:
Start a worker that connects to the specified work pool:
bash
prefect worker start -p my-hpc-dask-pool
4. Trigger the Flow Run:
Trigger the flow run from the Prefect UI or via CLI:
bash
prefect deployment run my-flow-deployment
Choose the process
type if you are running on self-hosted VMs and the kubernetes
type if you are leveraging Kubernetes on your HPC cluster.
For more details on work pools and workers, refer to:
- Work Pools: https://docs.prefect.io/latest/concepts/work-pools/
- Deployments: https://docs.prefect.io/latest/concepts/deployments/