Andrew Rosen
06/29/2023, 4:41 PMprefect-dask
to achieve this. Does anyone have some minimal working examples for how to dispatch individual tasks in a flow as Slurm jobs?Daniel
06/29/2023, 9:15 PMimport asyncio
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from dask_jobqueue import SLURMCluster
cluster_kwargs = {
"jobs": 3,
"cores": 1,
"memory": "1G",
"walltime": 1,
}
async def make_cluster(jobs=1, **kwargs):
cluster = await SLURMCluster(**kwargs)
cluster.scale(jobs)
return cluster
cluster = asyncio.run(make_cluster(**cluster_kwargs))
@task
def log_task(name: str):
return name.upper()
@flow(task_runner=DaskTaskRunner(cluster.scheduler_address))
def log_flow(names: list):
futures = []
for name in names:
futures.append(log_task.submit(name))
return [f.result() for f in futures]
if __name__ == "__main__":
print(log_flow(["a", "b"]))
It's a little trickier from a deployment, since you can't use asyncio.run to create the cluster. I worked around this by wrapping log_flow in an async flow, then deploying that:
@flow
async def log_flow_slurm(names: list):
cluster = await make_cluster(**cluster_kwargs)
return log_flow.with_options(task_runner=DaskTaskRunner(cluster.scheduler_address))(names)
Andrew Rosen
06/29/2023, 9:16 PMsbatch
-es? or is this run some other way?Daniel
06/29/2023, 10:31 PMAndrew Rosen
06/29/2023, 10:31 PMDaniel
07/01/2023, 2:36 AMAndrew Rosen
07/01/2023, 2:36 AMDaniel
07/24/2023, 3:51 AMservices:
prefect-server:
image: prefecthq/prefect:2.10-python3.11
ports:
- 4200:4200
volumes:
- prefect_test:/root/.prefect
environment:
PREFECT_API_URL: http://$HOSTNAME:4200/api
PREFECT_SERVER_API_HOST: 0.0.0.0
command: prefect server start
volumes:
prefect_test:
Andrew Rosen
07/24/2023, 4:59 AM