I'm trying to test prefect on our slurm HPC with t...
# prefect-getting-started
d
I'm trying to test prefect on our slurm HPC with this flow:
Copy code
# log_flow.py
from time import sleep
import sys

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(queue='fast', cores=2, memory="5 GB")
cluster.scale(2)

@task
def log_task(name):
    sleep(2)
    print(name)

@flow(task_runner=DaskTaskRunner(cluster.scheduler_address))
def log_flow(names: list[str]):
    for name in names:
        log_task.submit(name)

if __name__ == "__main__":
    name = sys.argv[1]
    log_flow([name, "arthur", "trillian", "ford", "marvin"])
It runs fine locally (on a login node), e.g.
python log_flow.py Marvin
, submitting jobs to our cluster, but I get this error trying to build a deployment
Script at 'log_flow.py' encountered an exception: RuntimeError("There is no current event loop in thread 'AnyIO worker thread'.")
I also tried defining the cluster this way, as described here, but this just hangs, even running locally:
Copy code
@flow(
    task_runner=DaskTaskRunner(
        cluster_class="dask_jobqueue.SLURMCluster",
        cluster_kwargs={"queue": "fast", "cores": 2, "memory": "5 GB"},
    )
)
Moving
cluster
to a separate process and replacing
cluster.scheduler_address
with the running cluster's address as a hard-coded string works, but means the flow can't define its own resource needs.
Copy code
dask                   2023.3.1
dask-jobqueue          0.8.1
prefect                2.10.13
prefect-dask           0.2.4
python                 3.9.13
Any ideas?
r
I believe @Tim Galvin is using a SLURM cluster and might be able to advise on what's working for him along with what you might need to change.