Hi all and <@ULVA73B9P>, I have a subflow `B1` han...
# ask-community
p
Hi all and @Marvin, I have a subflow
B1
hanging if it is called from flow
A
if and only iff it uses a different
DaskTaskRunner
. Does anyone know what the issue could be? The rough code is
Copy code
runner1 = DaskTaskRunner(..) # where I am using a SLURMCluster, ie: cluster_class = "dask_jobqueue.SLURMCluster"
runner2 = DaskTaskRunner(..) # another SLURMCluster

@flow
def subflowB():
  
@flow
def flowA():
  subhflowB.with_options(task_runner = runner2)()

if __name__ == "__main__":
  # works 
  flowA.with_options(task_runners = runner2)()
  # hangs 
  flowA.with_options(task_runners = runner1)()
I am assuming this has to do with the interaction of the slurm dask task runner at the higher level interacting with a task being sent to it trying to launch another slurm based dask task runner. Is this the case?
m
It looks like you're encountering a common issue when dealing with nested flows and different task runners using Dask with SLURM clusters in Prefect. Here's what might be happening based on the Prefect documentation: 1. Task Runner Configuration: The Prefect DaskTaskRunner can either connect to an existing cluster or create temporary clusters for each flow run. If you're using a SLURMCluster for both your flows, the issue might be stemming from how these clusters are being managed. If runner1 and runner2 are connected to different SLURMClusters, then when
subflowB
is being run within
flowA
, it might be trying to launch its own SLURM cluster (with
runner1
) while there's an existing one (from
runner2
), causing conflicts. 2. Resource Contention: Using different SLURMClusters for your flows might lead to resource competition or deadlocks. If multiple flow runs try to allocate resources concurrently on the SLURM scheduler without proper coordination, it might lead to one or both of them hanging. 3. Cluster Setup and Addressing: Ensure that the SLURMCluster configurations are correct and not inadvertently using overlapping resources or misaddressing the scheduler. To troubleshoot this issue further, you might want to: - Check the configurations of the SLURMCluster instances to ensure they're not conflicting. - Consider using a temporary Dask cluster for each flow to avoid conflicts, which you can do by specifying the
cluster_class
and required parameters (more details here for Prefect 2.x and Prefect 3.x details). - Monitor log outputs from the Dask scheduler and workers for any error messages. Sometimes dealing with multiple clusters is like juggling flaming chainsaws—exciting, but slightly dangerous without the right safety gear!
n
hi @Pascal Jahan Elahi (Pawsey) - please use #C04DZJC94DC to ask marvin questions! thanks
p
Hi @Nate, the issue is that marvin doesn’t always no the answer to these subtle questions and so I think it warrants asking the community.
n
you're right marvin often does poorly on subtle questions, i just mean that so that folks don't start using this channel for marvin
as far as your question, if you're bringing our own cluster, what do the kwargs look like?
p
Hi @Nate, apologies for the long period of silence, I was working on other projects and then went on leave. I am back. So the slurm cluster, the configuration is I think not important but here’s the output of printing out the class,
cluster_kwargs
,
client_kwargs
and
concurrency_type
for a cluster with a single slurm partition (the main thing is I want to use different resource requests)
Copy code
<class 'dask_jobqueue.slurm.SLURMCluster'>
Cluster kwargs {'name': 'gpu', 'cores': 70, 'memory': '480GB', 'processes': 1, 'python': 'python3', 'death_timeout': 30, 'local_directory': '$TMPDIR', 'shared_temp_directory': None, 'worker_command': 'distributed.cli.dask_worker', 'worker_extra_args': [], 'interface': 'ibp1s0f1', 'shebang': '#!/usr/bin/bash', 'queue': 'gpu', 'walltime': '1:00:00', 'env_extra': None, 'job_script_prologue': ['source ~/.bashrc', 'module load cmake/3.24.4 nvhpc/24.5 python/3.11.10', 'module load hpcx-mt-ompi', 'export OMP_PLACES=cores', 'export OMP_MAX_ACTIVE_LEVELS=4'], 'job_cpu': 32, 'job_mem': None, 'job_extra': None, 'job_extra_directives': ['--gres=gpu:1', '-w n00[1-4]', '--nodes=1', '--exclusive'], 'job_directives_skip': [], 'log_directory': None, 'scheduler_options': {}}
Client kwargs {'set_as_default': False}
Concurrency  TaskConcurrencyType.PARALLEL

--------------
cpu
<class 'dask_jobqueue.slurm.SLURMCluster'>
Cluster kwargs {'name': 'cpu', 'cores': 70, 'memory': '480GB', 'processes': 4, 'python': 'python3', 'death_timeout': 30, 'local_directory': '$TMPDIR', 'shared_temp_directory': None, 'worker_command': 'distributed.cli.dask_worker', 'worker_extra_args': [], 'interface': 'ibp1s0f1', 'shebang': '#!/usr/bin/bash', 'queue': 'gpu', 'walltime': '1:00:00', 'env_extra': None, 'job_script_prologue': ['source ~/.bashrc', 'module load cmake/3.24.4 nvhpc/24.5 python/3.11.10', 'module load hpcx-mt-ompi', 'export OMP_PLACES=cores', 'export OMP_MAX_ACTIVE_LEVELS=4'], 'job_cpu': 8, 'job_mem': '32GB', 'job_extra': None, 'job_extra_directives': ['-w n00[5-6]', '--nodes=1'], 'job_directives_skip': [], 'log_directory': None, 'scheduler_options': {}}
Client kwargs {'set_as_default': False}
Concurrency  TaskConcurrencyType.PARALLEL
Note that I don’t see an issue when running subflows sequentially that use different dask task runners, just issues with concurrent subflows
I also have an issue if the main flow uses a dasktaskrunner and then subflows are called using a different one
it does appear that something like this use of DaskTaskRunners (using SlurmCLUSTER at least) is unable to have two subflows running at the same time
If I set the task runners to be say the concurrent task runner, then all is well. I will try playing with non-slurm dask task runners to see if the issue is slurm