Craig Rose
07/15/2024, 6:22 AMMarvin
07/15/2024, 6:30 AMtornado.iostream.StreamClosedError: Stream is closed
warning appears to be a benign issue related to the underlying communication layers of Dask. Although it triggers a warning, it does not affect the execution of your flow. This issue is more about the internals of Dask than Prefect itself.
For reference, you can check the details on Dask's usage with Prefect in Prefect's Task Runners documentation and consider searching or reporting the issue on Dask's GitHub repository if it persists or causes real issues.
If you encounter persistent problems or the error affects your workflows, consider updating both Dask and Prefect to their latest versions.Craig Rose
07/16/2024, 1:41 AMMarvin
07/16/2024, 1:41 AMDaskTaskRunner
, you can define these parameters within the flow function. Here’s an example:
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def hello_task(name: str):
print(f"Hello, {name}!")
@flow
def dynamic_dask_flow(dynamic_params: dict):
dask_task_runner = DaskTaskRunner(**dynamic_params)
for name in ["arthur", "trillian", "ford", "marvin"]:
hello_task.submit(name, task_runner=dask_task_runner)
if __name__ == "__main__":
dynamic_params = {"cluster_kwargs": {"n_workers": 2, "threads_per_worker": 2}}
dynamic_dask_flow(dynamic_params)
In this example:
1. Parameters for DaskTaskRunner
are passed as a dictionary (dynamic_params
).
2. The DaskTaskRunner
is instantiated within the flow using these dynamic parameters.
For further details, you can explore the complete example in this code link.Craig Rose
07/16/2024, 1:51 AMMarvin
07/16/2024, 1:51 AMtask_runner
directly to the task. Instead, the task_runner
argument should only apply at the flow level and not for each individual task. You can't pass the task runner to task.submit
.
Here is how to dynamically pass parameters to the DaskTaskRunner
at the flow level:
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def hello_task(name: str):
print(f"Hello, {name}!")
@flow
def dynamic_dask_flow(cluster_kwargs: dict):
dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
for name in ["arthur", "trillian", "ford", "marvin"]:
hello_task.submit(name)
if __name__ == "__main__":
cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
dynamic_dask_flow(cluster_kwargs=cluster_kwargs)
In this example:
1. Parameters for DaskTaskRunner
are passed as cluster_kwargs
.
2. The DaskTaskRunner
is instantiated at the flow level using these parameters.
For more details on configuring DaskTaskRunner
, refer to Prefect Task Runners documentation.Craig Rose
07/16/2024, 2:05 AMMarvin
07/16/2024, 2:05 AMDaskTaskRunner
object after it has been initialized and started. Here is an example of how to do this:
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def hello_task(name: str):
print(f"Hello, {name}!")
@flow
def dynamic_dask_flow(cluster_kwargs: dict):
dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
# Start the DaskTaskRunner to initialize the cluster
with dask_task_runner.start():
# The cluster object can now be accessed via dask_task_runner.cluster
cluster = dask_task_runner.cluster
print(f"Dask dashboard is available at: {cluster.dashboard_link}")
for name in ["arthur", "trillian", "ford", "marvin"]:
hello_task.submit(name)
if __name__ == "__main__":
cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
dynamic_dask_flow(cluster_kwargs=cluster_kwargs)
In this example:
1. The DaskTaskRunner
is created with cluster_kwargs
.
2. The task runner is started using the with dask_task_runner.start()
context manager.
3. The cluster object is accessed via dask_task_runner.cluster
.
4. Print the dashboard_link
attribute of the cluster object to get the URL of the Dask dashboard.
Here is the reference from the Prefect Dask Task Runners documentation.Craig Rose
07/16/2024, 2:13 AMMarvin
07/16/2024, 2:13 AMDaskTaskRunner
. Unfortunately, the start
method cannot be used directly as a context manager. Instead, you have to await it and retrieve the cluster information after the flow
has been run.
Here's the corrected approach:
python
from prefect import task, flow
from prefect_dask.task_runners import DaskTaskRunner
@task
def hello_task(name: str):
print(f"Hello, {name}!")
@flow
async def dynamic_dask_flow(cluster_kwargs: dict):
dask_task_runner = DaskTaskRunner(cluster_kwargs=cluster_kwargs)
# Start the DaskTaskRunner
await dask_task_runner.start()
# Access the cluster object
cluster = dask_task_runner.cluster
print(f"Dask dashboard is available at: {cluster.dashboard_link}")
# Submit tasks
for name in ["arthur", "trillian", "ford", "marvin"]:
hello_task.submit(name)
# Shut down the DaskTaskRunner
await dask_task_runner.shutdown()
if __name__ == "__main__":
import asyncio
cluster_kwargs = {"n_workers": 2, "threads_per_worker": 2}
asyncio.run(dynamic_dask_flow(cluster_kwargs=cluster_kwargs))
In this corrected version:
1. The flow
function is defined as async
.
2. The DaskTaskRunner
is started and awaited using await dask_task_runner.start()
.
3. Retrieve the cluster object and print the dashboard_link
.
4. After submitting tasks, the task runner is shut down using await dask_task_runner.shutdown()
.
For further details on using DaskTaskRunner
, refer to the Prefect Dask Task Runners documentation.Craig Rose
07/16/2024, 2:14 AMMarvin
07/16/2024, 2:15 AMdashboard_link
, during the execution of a flow with DaskTaskRunner
, you can accomplish this by deriving a custom task runner class. This task runner will capture cluster details and make them accessible. Unfortunately, the Prefect API does not provide a direct or documented way to extract this information otherwise.
Here is an example of how you can derive a custom DaskTaskRunner
to achieve this:
python
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
class CustomDaskTaskRunner(DaskTaskRunner):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.cluster = None
async def start(self):
await super().start()
self.cluster = self._client.cluster
@task
def hello_task(name: str):
print(f"Hello, {name}!")
@flow(task_runner=CustomDaskTaskRunner(cluster_kwargs={"n_workers": 2, "threads_per_worker": 2}))
async def dynamic_dask_flow():
task_runner = CustomDaskTaskRunner()
await task_runner.start()
cluster = task_runner.cluster
if cluster:
print(f"Dask dashboard is available at: {cluster.dashboard_link}")
for name in ["arthur", "trillian", "ford", "marvin"]:
hello_task.submit(name)
await task_runner.shutdown()
if __name__ == "__main__":
import asyncio
asyncio.run(dynamic_dask_flow())
This example defines a CustomDaskTaskRunner
class that saves the created cluster. Then in the flow, you can access the cluster's dashboard_link
.
For further details, refer to the Prefect Dask Task Runners documentation.