Tim Galvin
08/28/2023, 7:30 AMtask_runner
parameter of a Flow
?
I have a toy where I want a specific task
decorated function to run on a different task runner than others. I have exposed the task_runner
option thqt exists in task,submit
so that I can supply a different DaskTaskRunner
, but my initial attempts get either a "Task runner not started yet" or with something a little more sophisticated like:
@flow
def my_flow():
slim_tr = get_dask_runner(yaml_file=small)
slim_tr.cluster_class(
**slim_tr.cluster_kwargs
)
slim_tr.scale(2)
res = my_task.submit(task_runner=slim_tr)
I get:
File "/scratch3/gal16b/mambaforge/envs/trprefect/lib/python3.11/site-packages/aiosqlite/core.py", line 121, in run
future = await task_runner.submit(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/scratch3/gal16b/mambaforge/envs/trprefect/lib/python3.11/site-packages/prefect_dask/task_runners.py", line 246, in submit
get_loop(future).call_soon_threadsafe(set_exception, future, e)
File "/scratch3/gal16b/mambaforge/envs/trprefect/lib/python3.11/asyncio/base_events.py", line 806, in call_soon_threadsafe
self._check_closed()
File "/scratch3/gal16b/mambaforge/envs/trprefect/lib/python3.11/asyncio/base_events.py", line 519, in _check_closed
raise RuntimeError(
RuntimeError: The task runner must be started before submitting work.
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
I am trying this out as i am running some code on a slurm based system, and minimising the compute requirements for certain stages within a single flow, which are requested via a submission through to the slurm schedular, would be ideal. I would really like a MWE so I can continue to advocate for prefect with the powers-that-be within my organisation (over bash based scripts)