Hi, does anybody know if it is possible to registe...
# ask-community
s
Hi, does anybody know if it is possible to register Worker Plugins when connecting to a Dask Cluster? (as
client.register_worker_plugin
allows)
c
Hi Sébastien — this is definitely possible; I think the easiest place to put this hook is via a state handler on the Flow:
Copy code
def plugin_handler(f, old_state, new_state):
    if new_state.is_running(): # flow run about to begin
        # get dask client, register plugin if it doesn't exist
and then to attach this to your flow:
Copy code
Flow("my-flow", state_handlers=[plugin_handler])
s
Thank you @Chris White will try this out!
👍 1
Hi @Chris White - am I missing something here?
Copy code
from distributed.diagnostics.plugin import UploadFile

def plugin_handler(f, old_state, new_state):
    if new_state.is_running(): # flow run about to begin
        # get dask client, register plugin if it doesn't exist
        with worker_client() as client:
            client.register_worker_plugin(UploadFile("dist/mypkg.egg")) 
   

with Flow("Test Workflow (UploadFile Plugin)", state_handlers=[plugin_handler]) as test_workflow:
    random_task()
I end up getting:
Copy code
ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: ValueError('No workers found')
It looks like dask is not started yet when it hits the new_state.is_running(). Any ideas?