<@ULVA73B9P> When using the DaskTaskRunner why do ...
# ask-marvin
c
@Marvin When using the DaskTaskRunner why do I get tornado.iostream.StreamClosedError: Stream is closed even though the flow completes without error
m
The
tornado.iostream.StreamClosedError: Stream is closed
warning appears to be a benign issue related to the underlying communication layers of Dask. Although it triggers a warning, it does not affect the execution of your flow. This issue is more about the internals of Dask than Prefect itself. For reference, you can check the details on Dask's usage with Prefect in Prefect's Task Runners documentation and consider searching or reporting the issue on Dask's GitHub repository if it persists or causes real issues. If you encounter persistent problems or the error affects your workflows, consider updating both Dask and Prefect to their latest versions.
c
@Marvin How would I dynamically pass parameters to DaskTaskRunner
m
To dynamically pass parameters to
DaskTaskRunner
, you can define these parameters within the flow function. Here’s an example:
Copy code
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def hello_task(name: str):
    print(f"Hello, {name}!")

@flow
def dynamic_dask_flow(dynamic_params: dict):
    dask_task_runner = DaskTaskRunner(**dynamic_params)
    for name in ["arthur", "trillian", "ford", "marvin"]:
        hello_task.submit(name, task_runner=dask_task_runner)

if __name__ == "__main__":
    dynamic_params = {"cluster_kwargs": {"n_workers": 2, "threads_per_worker": 2}}
    dynamic_dask_flow(dynamic_params)
In this example: 1. Parameters for
DaskTaskRunner
are passed as a dictionary (
dynamic_params
). 2. The
DaskTaskRunner
is instantiated within the flow using these dynamic parameters. For further details, you can explore the complete example in this code link.
c
@Marvin prefect.exceptions.ParameterBindError: Error binding parameters for function 'hello_task': got an unexpected keyword argument 'task_runner'. Function 'hello_task' has signature 'name: str' but received args: ('arthur',) and kwargs: ['task_runner'].
m
It appears I made an error in the last example by trying to pass the
task_runner
directly to the task. Instead, the
task_runner
argument should only apply at the flow level and not for each individual task. You can't pass the task runner to
task.submit
. Here is how to dynamically pass parameters to the
DaskTaskRunner
at the flow level:
Copy code
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def hello_task(name: str):
    print(f"Hello, {name}!")

@flow
def dynamic_dask_flow(cluster_kwargs: dict):
    dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
    for name in ["arthur", "trillian", "ford", "marvin"]:
        hello_task.submit(name)

if __name__ == "__main__":
    cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
    dynamic_dask_flow(cluster_kwargs=cluster_kwargs)
In this example: 1. Parameters for
DaskTaskRunner
are passed as
cluster_kwargs
. 2. The
DaskTaskRunner
is instantiated at the flow level using these parameters. For more details on configuring
DaskTaskRunner
, refer to Prefect Task Runners documentation.
c
@Marvin how do I get information about the created cluster such as the dashboard_link
m
To retrieve information about the created Dask cluster, such as the dashboard link, you need to access the
DaskTaskRunner
object after it has been initialized and started. Here is an example of how to do this:
Copy code
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

@task
def hello_task(name: str):
    print(f"Hello, {name}!")

@flow
def dynamic_dask_flow(cluster_kwargs: dict):
    dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
    
    # Start the DaskTaskRunner to initialize the cluster
    with dask_task_runner.start():
        # The cluster object can now be accessed via dask_task_runner.cluster
        cluster = dask_task_runner.cluster
        print(f"Dask dashboard is available at: {cluster.dashboard_link}")
        
        for name in ["arthur", "trillian", "ford", "marvin"]:
            hello_task.submit(name)

if __name__ == "__main__":
    cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
    dynamic_dask_flow(cluster_kwargs=cluster_kwargs)
In this example: 1. The
DaskTaskRunner
is created with
cluster_kwargs
. 2. The task runner is started using the
with dask_task_runner.start()
context manager. 3. The cluster object is accessed via
dask_task_runner.cluster
. 4. Print the
dashboard_link
attribute of the cluster object to get the URL of the Dask dashboard. Here is the reference from the Prefect Dask Task Runners documentation.
c
@Marvin TypeError: '_AsyncGeneratorContextManager' object does not support the context manager protocol
m
Apologies for the confusion. Here is the correct approach. You need to use the proper method to access the cluster information when using
DaskTaskRunner
. Unfortunately, the
start
method cannot be used directly as a context manager. Instead, you have to await it and retrieve the cluster information after the
flow
has been run. Here's the corrected approach:
Copy code
python
from prefect import task, flow
from prefect_dask.task_runners import DaskTaskRunner

@task
def hello_task(name: str):
    print(f"Hello, {name}!")

@flow
async def dynamic_dask_flow(cluster_kwargs: dict):
    dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
    
    # Start the DaskTaskRunner
    await dask_task_runner.start()
    
    # Access the cluster object
    cluster = dask_task_runner.cluster
    print(f"Dask dashboard is available at: {cluster.dashboard_link}")

    # Submit tasks
    for name in ["arthur", "trillian", "ford", "marvin"]:
        hello_task.submit(name)
    
    # Shut down the DaskTaskRunner
    await dask_task_runner.shutdown()

if __name__ == "__main__":
    import asyncio
    cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
    asyncio.run(dynamic_dask_flow(cluster_kwargs=cluster_kwargs))
In this corrected version: 1. The
flow
function is defined as
async
. 2. The
DaskTaskRunner
is started and awaited using
await dask_task_runner.start()
. 3. Retrieve the cluster object and print the
dashboard_link
. 4. After submitting tasks, the task runner is shut down using
await dask_task_runner.shutdown()
. For further details on using
DaskTaskRunner
, refer to the Prefect Dask Task Runners documentation.
c
@Marvin TypeError: object _AsyncGeneratorContextManager can't be used in 'await' expression
m
To access cluster information, such as the
dashboard_link
, during the execution of a flow with
DaskTaskRunner
, you can accomplish this by deriving a custom task runner class. This task runner will capture cluster details and make them accessible. Unfortunately, the Prefect API does not provide a direct or documented way to extract this information otherwise. Here is an example of how you can derive a custom
DaskTaskRunner
to achieve this:
Copy code
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner

class CustomDaskTaskRunner(DaskTaskRunner):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.cluster = None

    async def start(self):
        await super().start()
        self.cluster = self._client.cluster

@task
def hello_task(name: str):
    print(f"Hello, {name}!")

@flow(task_runner=CustomDaskTaskRunner(cluster_kwargs={"n_workers": 2, "threads_per_worker": 2}))
async def dynamic_dask_flow():
    task_runner = CustomDaskTaskRunner()
    await task_runner.start()

    cluster = task_runner.cluster
    if cluster:
        print(f"Dask dashboard is available at: {cluster.dashboard_link}")

    for name in ["arthur", "trillian", "ford", "marvin"]:
        hello_task.submit(name)

    await task_runner.shutdown()

if __name__ == "__main__":
    import asyncio
    asyncio.run(dynamic_dask_flow())
This example defines a
CustomDaskTaskRunner
class that saves the created cluster. Then in the flow, you can access the cluster's
dashboard_link
. For further details, refer to the Prefect Dask Task Runners documentation.