Is there any way to access the Dask client from wi...
# ask-community
h
Is there any way to access the Dask client from within Prefect and do very granular parallelism using the dask.{dataframe,delayed} API:s? I have "large ticket" embarrassingly parallel computations that I map with prefect.task, but then inside these tasks, I want to enable dask to work no chunks of data arrays. Of course I'd also like to control this parallelism to some extent or there will be thousands of concurrently running tasks.
j
Hi Haf, let me look into this and will get back to you shortly.
You're absolutely able to control any settings you would want to make to your Dask cluster, this doc and the blog post linked at the bottom might provide more information: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#further-steps While I was not able to find any documentation around it, if you actively want to call dask code from within your tasks (and you are running on a dask executor) I believe you are able to call
dask_distributed.worker_client
from within tasks. I cannot guarantee this is recommended but it appears possible. Here is an example from another thread where someone seems to be doing this: https://prefect-community.slack.com/archives/CL09KU1K7/p1634664171072100?thread_ts=1634556225.467000&cid=CL09KU1K7 I will forward this question to see if I can get you a better answer
k
You can do it like this
h
Can you help me understand how this all works? How does checkpoint=False, saving to GCS and the distribution of tasks across a distributed Dask cluster all work in conjunction with ephemeral nodes? E.g. if I have
A -> B(checkpoint=False) -> C(checkpoint=True) -> D
A is config B is creating a ML model in a very specific format that crashes the serialiser if I checkpoint it C is a computation that takes about 1½ hours but that may be interrupted D is a save-to-storage task that is happily retried and is idempotent How does all of this work together? I'm specifically looking at crash-resume remantics (not grey failure here). A crashes: rerun from start, B: rerun from cached A, C: rerun from cached A, then B, because B is not checkpointed, D : retry with cached result from C? Now if I add
worker_client
into the mix, am I right to assume that if the parent node, say (a) crashes, while work has been distributed across (b) and (c) also, the flow goes like the previous paragraph, but if (b) crashes, Dask will transparently rerun it?
As for grey failure, since I'm running on ephemeral nodes, I actually will get notified but I don't think Prefect is picking up on this, instead marking the task's heartbeat as timed out and failing the whole flow.
k
I think your understanding is right. Basically it will re-run from the last checkpoint available. I am not seeing how A will fail while distributed work from B or C is being done. The failover of Dask will kick in and try to retry stuff that failed (depends on the error), this happens if you have a
KilledWorker
error I think
h
Why doesn't it reset the computations when KilledWorker happens? I have specified a GCS result, it's using the cloud running, I have retries on most tasks?
k
It does upto a certain number of tries. The nanny service should spin it up. This doc is good if you havent seen it yet
👀 1