m

    Matt Wong-Kemp

    2 years ago
    When using the
    DaskExecutor
    is it safe to share the dask cluster between the execution of the tasks and direct use of dask? The use case here is I have a large number of small tasks I'd like to do in parallel, followed by some large joined data analysis. I'd like to gain the concurrency from running on a dask cluster in my flows, but at the same time I want to perform some data analysis using the dask
    Dataframe
    class and distribute this across a cluster as well. If I provision the cluster myself, is it safe to share the scheduler between the flow and the dataframe library? Or should I expect to need to provision my own dask cluster inside a task to run my Dataframe code on?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Yep!
    That's totally supported. I just answered this in more detail elsewhere, let me grab you a link.
    m

    Matt Wong-Kemp

    2 years ago
    awesome, thanks. While you're here, is there any plans on support for coroutines or
    async/await
    ? dask is obviously build on coroutines and my very quick glance at the executor base class tells me it's mostly working in terms of future-shaped things - this would be very useful in my scenario where I'd like to shell out to 10k kubernetes jobs (which effectively act like coroutines rather than tasks), and then perform Dataframe analysis on the set I've got. I'm confident that this will work at the minute, but I'd like to mkae my cluster adaptable, so that I can use the concurrency needed for launching the tasks at the start, and the parallelism needed for the dataframe at the end.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    There are not plans to enable direct coroutine tasks, no. I'm not sure I understand how coroutine tasks would make your workflow above easier from prefect side. Prefect tasks all have an amount of overhead and are individually managed by prefect - having each task be a coroutine wouldn't immediately make things more efficient for you. One option would be to have a task that submits your jobs and waits on them using whatever tools you want and returns the job results (you could even run this as a coroutine using
    asyncio.run
    ). You could then use the results downstream in prefect tasks as needed (mapping, etc...).
    I may not be understanding your use case though.
    10k jobs is also a lot of k8s jobs, have you considered using a single k8s job with some level of parallelism? https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs. This won't work for all use cases, but if it does for yours it'd certainly be more efficient.
    m

    Matt Wong-Kemp

    2 years ago
    I guess I'm distinguishing here between calling out to some outside service and waiting for that to finish in prefect, and actually doing heavy lifitng in prefect. If you ignore that k8s bit for a second, and instead imagine that I call out to some service 10,000 times and it takes somewhere between 10-30 minutes for each call, and then it presents me with some result. Each of these results are independent of each other but each form a subset of some larger dataset. That said, I'd still like the power that prefect gives me over restarts, tracking failures, error reporting that I get from having 10,000 mapped tasks. If I have one task that submits 10k templated requests to this service inside one task, then I can only report success, failure or choose to rerun at the complete set level. But if 10% failed due to infra issues, I'd like to just retry those without rerunning the other 90%. So far so sensible, this says I should use separate prefect tasks. But if I understand correctly, the only way I can make my graph wait those 10-30 minutes for execution is to either busy-wait or dumb sleep inside the tasks. So to the view of a dask cluster, if I want to run these 10k tasks in 30 minutes, I need to run a dask cluster that can have 10k functions all busy-waiting while my 10k kubernetes jobs run elsewhere, which might double the cost of running this in a cloud environment. If I could naturally express these service calls as coroutines instead of tasks, which dask does appear to support (https://distributed.dask.org/en/latest/asynchronous.html) then I could run a much smaller cluster to generate the data, and later adapt my cluster to be much larger for parallel heavy-CPU work.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Ah, gotcha. Dask does not support async tasks, it supports submitting synchronous tasks from an asynchronous context. All work still is run in a threadpool as synchronous functions.
    m

    Matt Wong-Kemp

    2 years ago
    aw, man. ok, thanks, i'll dig around a bit.
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    If you don't care about tracking each individual k8s job as a separate prefect task, you could do this efficiently as a single task. Alternatively, if you only want tracking for reporting (but not restarting) purposes, you could use a single task to submit and monitor, then map over the results to report failures.
    Sorry I don't have a better suggestion here
    m

    Matt Wong-Kemp

    2 years ago
    np, thanks