Sébastien Arnaud
05/20/2021, 2:17 AMclient.register_worker_plugin
allows)Chris White
def plugin_handler(f, old_state, new_state):
if new_state.is_running(): # flow run about to begin
# get dask client, register plugin if it doesn't exist
and then to attach this to your flow:
Flow("my-flow", state_handlers=[plugin_handler])
Sébastien Arnaud
05/20/2021, 12:37 PMSébastien Arnaud
05/21/2021, 12:47 AMfrom distributed.diagnostics.plugin import UploadFile
def plugin_handler(f, old_state, new_state):
if new_state.is_running(): # flow run about to begin
# get dask client, register plugin if it doesn't exist
with worker_client() as client:
client.register_worker_plugin(UploadFile("dist/mypkg.egg"))
with Flow("Test Workflow (UploadFile Plugin)", state_handlers=[plugin_handler]) as test_workflow:
random_task()
I end up getting:
ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: ValueError('No workers found')
It looks like dask is not started yet when it hits the new_state.is_running(). Any ideas?