haf

    haf

    10 months ago
    Is there any way to access the Dask client from within Prefect and do very granular parallelism using the dask.{dataframe,delayed} API😒? I have "large ticket" embarrassingly parallel computations that I map with prefect.task, but then inside these tasks, I want to enable dask to work no chunks of data arrays. Of course I'd also like to control this parallelism to some extent or there will be thousands of concurrently running tasks.
    j

    Jake Kaplan

    10 months ago
    Hi Haf, let me look into this and will get back to you shortly.
    You're absolutely able to control any settings you would want to make to your Dask cluster, this doc and the blog post linked at the bottom might provide more information: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#further-steps While I was not able to find any documentation around it, if you actively want to call dask code from within your tasks (and you are running on a dask executor) I believe you are able to call
    dask_distributed.worker_client
    from within tasks. I cannot guarantee this is recommended but it appears possible. Here is an example from another thread where someone seems to be doing this: https://prefect-community.slack.com/archives/CL09KU1K7/p1634664171072100?thread_ts=1634556225.467000&cid=CL09KU1K7 I will forward this question to see if I can get you a better answer
    Kevin Kho

    Kevin Kho

    10 months ago
    You can do it like this
    haf

    haf

    10 months ago
    Can you help me understand how this all works? How does checkpoint=False, saving to GCS and the distribution of tasks across a distributed Dask cluster all work in conjunction with ephemeral nodes? E.g. if I have
    A -> B(checkpoint=False) -> C(checkpoint=True) -> D
    A is config B is creating a ML model in a very specific format that crashes the serialiser if I checkpoint it C is a computation that takes about 1½ hours but that may be interrupted D is a save-to-storage task that is happily retried and is idempotent How does all of this work together? I'm specifically looking at crash-resume remantics (not grey failure here). A crashes: rerun from start, B: rerun from cached A, C: rerun from cached A, then B, because B is not checkpointed, D : retry with cached result from C? Now if I add
    worker_client
    into the mix, am I right to assume that if the parent node, say (a) crashes, while work has been distributed across (b) and (c) also, the flow goes like the previous paragraph, but if (b) crashes, Dask will transparently rerun it?
    As for grey failure, since I'm running on ephemeral nodes, I actually will get notified but I don't think Prefect is picking up on this, instead marking the task's heartbeat as timed out and failing the whole flow.
    Kevin Kho

    Kevin Kho

    10 months ago
    I think your understanding is right. Basically it will re-run from the last checkpoint available. I am not seeing how A will fail while distributed work from B or C is being done. The failover of Dask will kick in and try to retry stuff that failed (depends on the error), this happens if you have a
    KilledWorker
    error I think
    haf

    haf

    9 months ago
    Why doesn't it reset the computations when KilledWorker happens? I have specified a GCS result, it's using the cloud running, I have retries on most tasks?
    Kevin Kho

    Kevin Kho

    9 months ago
    It does upto a certain number of tries. The nanny service should spin it up. This doc is good if you havent seen it yet