dh
12/17/2020, 2:39 PMJim Crist-Harif
12/17/2020, 2:44 PMFinished
or any other state is task dependent).
What executor are you using here?dh
12/17/2020, 3:16 PMThat said, tasks shouldn’t be submitted until their upstream tasks have been and shouldn’t start until the upstream tasks have finishedcould you point me to the line that produces this behavior? It’s probably because of my limited knowledge, I can’t seem to find a line like
state = self.check_upstream_finished( state, upstream_states=upstream_states )
in FlowRunner itself before [1] happens (not TaskRunner).
[1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/flow_runner.py#L593Jim Crist-Harif
12/17/2020, 3:25 PMLocalExecutor
handles this since executor.submit
won't return until the task is "completed" (note that a task that has finished running isn't necessarily in a Finished
state, it could "complete" in any valid state).Jim Crist-Harif
12/17/2020, 3:26 PMDaskExecutor
runs fine on slurm (using https://jobqueue.dask.org/en/latest/), and should be more performant than submitting a new slurm job per task.dh
12/18/2020, 5:45 PMAh, yeah, we don’t really support custom executors.may I ask for more context on this? Does this mean there’s no sophisticated example “as of yet” for a custom executor or there’s some intrinsic limitation to how Prefect Executor is assumed to behave such that writing a custom executor (outside that behavior) would be futile?
dh
12/18/2020, 5:49 PMIs there a reason you’re using a custom executor here?I had a similar discussion with Joe Schmid in this thread https://prefect-community.slack.com/archives/CL09KU1K7/p1608134659195800 Basically, we need to run pytorch distributed code (that involves multiple nodes with multiple GPUs involves in each node under an NVIDIA DGX system where resource groups (cgroups) and jobs are managed by SLURM). My understanding (that may be flawed) is that Dask currently stops at supporting a “single node” CudaCluster only under NVIDIA DGX: partly suggested here https://blog.dask.org/2019/06/09/ucx-dgx and we heard from other sources…
dh
12/18/2020, 5:54 PMDask handles this for us automatically (since it has its own dependency scheduler)does it mean Dask scheduler will topologically sort the DAG once again that may be different from the topological order defined by
Flow.sorted_tasks
?
Or the topological order will not change, but Dask scheduler will check if upstream tasks are completed or not?
Our custom executor is not doing that check currently and just delegates it to TaskRunner but I think we can add that logic to our executor as well and hopefully the reported issue will be gone.dh
12/21/2020, 5:06 PMJim Crist-Harif
12/21/2020, 11:12 PMDoes this mean there’s no sophisticated example “as of yet” for a custom executor or there’s some intrinsic limitation to how Prefect Executor is assumed to behave such that writing a custom executor (outside that behavior) would be futile?The
Executor
interface is not a public interface. We reserve the right to change and break it at any point. The only thing that is public for `Executor`s is their constructors (i.e. we won't change the signature for DaskExecutor
without warning). You can write your own executor by reading the code and seeing what's required here, but we won't guarantee things won't break between releases if the interface changes.
My understanding (that may be flawed) is that Dask currently stops at
supporting a “single node” CudaCluster only under NVIDIA DGX: partly
suggested here https://blog.dask.org/2019/06/09/ucx-dgx and we heard from other sources…Are you saying that the only way to work with GPUs in dask is on a single node using
CudaCluster
? This is incorrect. Enabling GPU support for a specific backend may require some low-level configuration, but all of the dask cluster-managers are just thin layers around the backing platform for submitting jobs. Since SLURM lets you submit processes with GPUs as resources, adding those resources to the SLURMCluster
should work fine: https://jobqueue.dask.org/en/latest/examples.html#slurm-deployment-providing-additional-arguments-to-the-dask-workers.
That said, if all you want to do is run a docker container, you could submit the job from within a prefect task and not rely on a distributed executor (the LocalDaskExecutor
should work fine here). Then prefect is only used to orchestrate transient jobs on the backing platform (SLURM in your case), rather than having several long running jobs for each dask worker. Depending on your use case, this might work fine.
Or the topological order will not change, but Dask scheduler will check if upstream tasks are completed or not?The Prefect flow-runner is currently highly coupled to Dask's semantics. It happily submits tasks that have uncompleted dependencies, knowing that dask will handle waiting until all upstream tasks are completed before starting it. Any custom executor would also have to implement these semantics. But as I said above - I highly recommend avoiding writing your own executor. There's likely a simpler way to accomplish what you want without requiring this level of effort on your end, and we make no guarantees on the interface of executors.
dh
12/29/2020, 2:14 PMExecutor
is a private interface. Nor did I know the FlowRunner
has knowledge of Dask
within itself (not injected)--- blame is on me for not being extra-cautious with a actively developing project like Prefect.
re: multi-node GPU / SLURM support in Dask. I think I shall try your proposal. Without deep understanding, I have been concerned with whether Dask workers, deployed on SLURM worker nodes, would interfere with GPU code (=Pytorch distributed code that involves CUDA and IPC using NCCL). But as you said, I think my use case would just suffice for “orchestrate transient jobs on the backing platform (SLURM in your case)” and not involve any Dask workers.
Thank you once again. I will share details as I discover them as other folks in the future may have similar questions in the community.