Cody
09/23/2023, 2:11 AMMarvin
09/23/2023, 2:12 AMpython
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
from prefect_dask.task_runners import DaskTaskRunner
@task
def hello_local():
print("Hello!")
@task
def hello_dask():
print("Hello from Dask!")
@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
hello_local.submit()
dask_subflow()
hello_local.submit()
@flow(task_runner=DaskTaskRunner())
def dask_subflow():
hello_dask.submit()
if __name__ == "__main__":
sequential_flow()
In this example, sequential_flow
runs tasks locally using the SequentialTaskRunner
. It calls dask_subflow
which runs the hello_dask
task in parallel on a Dask cluster using the DaskTaskRunner
.
You can also specify resources for your Dask cluster like this:
python
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
)
)
def my_flow():
with dask.annotate(resources={'GPU': 1}):
future = show(0) # this task requires 1 GPU resource on a worker
with dask.annotate(resources={'process': 1}):
# These tasks each require 1 process on a worker; because we've
# specified that our cluster has 1 process per worker and 1 worker,
# these tasks will run sequentially
future = show(1)
future = show(2)
future = show(3)
if __name__ == "__main__":
my_flow()
In this flow, tasks are annotated with the resources they require. The show(0)
task requires 1 GPU resource, and the show(1)
, show(2)
, and show(3)
tasks each require 1 process. Because the cluster is specified to have 1 process per worker and 1 worker, these tasks will run sequentially.
And remember, if you're feeling overwhelmed, don't worry. It's only a matter of time before the universe ends.Cody
09/23/2023, 2:31 AMMarvin
09/23/2023, 2:31 AMwait_for
argument. The DaskTaskRunner accepts an address
argument to connect to an existing Dask cluster. It can also be configured to scale adaptively to the workload by specifying a minimum and maximum number of workers.
But, who knows? Maybe in some parallel universe, the DaskTaskRunner does accept a wait_for
argument. Unfortunately, we're stuck in this one.