https://prefect.io logo
j

Jonathan Pou

08/04/2022, 9:01 PM
Hello! Has anyone been using Prefect 2.0 with Dask managed with Coiled? I am having issues with distributing a dataframe across multiple Dask workers within the same cluster. The cluster is deployed with 4 workers but the dataframe is only being loaded in 1 of the workers, leaving the 3 others idle. When I run the same script outside of a Prefect task & flow, it runs as expected - the dataframe gets distributed across the cluster for later use. I also tested another scenario where the cluster is deployed with only 1 worker, but with autoscaling enabled by passing adapt_kwargs={"maximum": 10} to DaskTaskRunner. In this scenario, it seems like the autoscaling request is ignored as the cluster remains of size 1. Lastly, is there a way to return the standard output of a print() statement executed on a remote dask cluster back to the Prefect logger?
Copy code
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask

coiled_executor = DaskTaskRunner(
	cluster_class="coiled.Cluster",
	cluster_kwargs={
		"n_workers" : 4,
		"software": "ttibi-dev",
		"shutdown_on_close": True,
		"worker_vm_types":["r6a.large"]
	},
	adapt_kwargs={"maximum": 10}
)


@task
def some_data_manipulation():
	df = dask.datasets.timeseries(
		"2000", "2020", partition_freq="2w"
	).persist()
	df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
	return df

@flow(task_runner=coiled_executor)
def test_flow():
	some_data_manipulation.submit()

if __name__ == "__main__":	
	test_flow()
โœ… 1
๐Ÿ‘€ 1
z

Zanie

08/04/2022, 9:13 PM
Do you know if it also only sends work to a single worker if using a non-coiled Dask cluster? Unfortunately there are some issues around retrieving logs from Dask right now, investigating a fix is on the backlog.
๐Ÿ‘ 1
j

Jonathan Pou

08/04/2022, 9:33 PM
I just tried running it on a local Dask cluster and I'm running into the same issue. The dataframe is only trying to load on a single worker. Understood, thanks!!
a

Andrew Huang

08/04/2022, 10:19 PM
opened an issue here: https://github.com/PrefectHQ/prefect-dask/issues/18 please feel free to add more information (e.g. expected vs actual output / dashboard)
๐Ÿ™Œ 1
s

Sam Dyson

08/09/2022, 11:09 PM
We ran into a similar problem in the past and we think that this is actually expected behaviour. Your prefect task is serialized and sent to the dask cluster as a single Dask task, but Dask workers do not automatically get a client to connect to the full cluster. Thus, it will attempt to do the work within the worker itself. You need to get a
worker_client
inside the task which will let the worker submit jobs to the cluster. I found this documentation helpful: https://distributed.dask.org/en/stable/task-launch.html#connection-with-context-manager
z

Zanie

08/10/2022, 3:05 PM
Thanks Sam! Thatโ€™s a helpful reminder.
@Andrew Huang Iโ€™d document the correct pattern here and mark that as expected.
โœ… 1
6 Views