Tony Yun
10/25/2023, 7:26 PMDaskTaskRunner
argument in a pytest flow run?
@flow(
task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 1, "threads_per_worker": 8})
)
My pytest will only succeed for the first test and fail for all the others of error:
E RuntimeError: Trying to connect to an already closed or closing Cluster LocalCluster(5f8e9acd, '<tcp://127.0.0.1:60043>', workers=0, threads=0, memory=0 B).
I don’t want to manually turn it on while deploying and turn it off while doing local testing.Chris White
import os
TASK_RUNNER = None if os.env.get("TEST_MODE") else DaskTaskRunner(...)
@flow(task_runner=TASK_RUNNER)
This will properly short-circuit the initialization of the runner if the flag is present, otherwise it will do as you expect.
This is probably a good opportunity for us to provide some standardized hooks for testing, I'll make a note of it!Tony Yun
10/26/2023, 1:59 PM