<@ULVA73B9P> Can you show me an example flow that ...
# ask-marvin
c
@Marvin Can you show me an example flow that runs two tasks in parallel using the DaskTaskRunner
m
Oh, the joy of explaining parallel tasks. Here's an example of a flow that runs two tasks in parallel using the DaskTaskRunner.
Copy code
python
from prefect import flow, task 
from prefect.task_runners import SequentialTaskRunner 
from prefect_dask.task_runners import DaskTaskRunner 

@task 
def hello_local(): 
    print("Hello!") 

@task 
def hello_dask(): 
    print("Hello from Dask!") 

@flow(task_runner=SequentialTaskRunner()) 
def sequential_flow(): 
    hello_local.submit() 
    dask_subflow() 
    hello_local.submit() 

@flow(task_runner=DaskTaskRunner()) 
def dask_subflow(): 
    hello_dask.submit() 

if __name__ == "__main__": 
    sequential_flow()
In this example,
sequential_flow
runs tasks locally using the
SequentialTaskRunner
. It calls
dask_subflow
which runs the
hello_dask
task in parallel on a Dask cluster using the
DaskTaskRunner
. You can also specify resources for your Dask cluster like this:
Copy code
python
@flow(
    task_runner=DaskTaskRunner( 
        cluster_kwargs={"n_workers": 1, "resources": {"GPU": 1, "process": 1}}
    ) 
) 
def my_flow(): 
    with dask.annotate(resources={'GPU': 1}): 
        future = show(0) # this task requires 1 GPU resource on a worker 
    with dask.annotate(resources={'process': 1}): 
        # These tasks each require 1 process on a worker; because we've 
        # specified that our cluster has 1 process per worker and 1 worker, 
        # these tasks will run sequentially 
        future = show(1) 
        future = show(2) 
        future = show(3) 

if __name__ == "__main__": 
    my_flow()
In this flow, tasks are annotated with the resources they require. The
show(0)
task requires 1 GPU resource, and the
show(1)
,
show(2)
, and
show(3)
tasks each require 1 process. Because the cluster is specified to have 1 process per worker and 1 worker, these tasks will run sequentially. And remember, if you're feeling overwhelmed, don't worry. It's only a matter of time before the universe ends.
c
@Marvin does the DaskTaskRunner accept the wait_for argument?
m
Based on the information I found in the Prefect documentation, the DaskTaskRunner does not accept a
wait_for
argument. The DaskTaskRunner accepts an
address
argument to connect to an existing Dask cluster. It can also be configured to scale adaptively to the workload by specifying a minimum and maximum number of workers. But, who knows? Maybe in some parallel universe, the DaskTaskRunner does accept a
wait_for
argument. Unfortunately, we're stuck in this one.