Daniel
06/14/2023, 9:25 PM# log_flow.py
from time import sleep
import sys
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(queue='fast', cores=2, memory="5 GB")
cluster.scale(2)
@task
def log_task(name):
sleep(2)
print(name)
@flow(task_runner=DaskTaskRunner(cluster.scheduler_address))
def log_flow(names: list[str]):
for name in names:
log_task.submit(name)
if __name__ == "__main__":
name = sys.argv[1]
log_flow([name, "arthur", "trillian", "ford", "marvin"])
It runs fine locally (on a login node), e.g. python log_flow.py Marvin
, submitting jobs to our cluster, but I get this error trying to build a deployment Script at 'log_flow.py' encountered an exception: RuntimeError("There is no current event loop in thread 'AnyIO worker thread'.")
I also tried defining the cluster this way, as described here, but this just hangs, even running locally:
@flow(
task_runner=DaskTaskRunner(
cluster_class="dask_jobqueue.SLURMCluster",
cluster_kwargs={"queue": "fast", "cores": 2, "memory": "5 GB"},
)
)
Moving cluster
to a separate process and replacing cluster.scheduler_address
with the running cluster's address as a hard-coded string works, but means the flow can't define its own resource needs.
dask 2023.3.1
dask-jobqueue 0.8.1
prefect 2.10.13
prefect-dask 0.2.4
python 3.9.13
Any ideas?Ryan Peden
06/15/2023, 12:39 PM