Thread
#prefect-community
    Sébastien Arnaud

    Sébastien Arnaud

    1 year ago
    Hi, does anybody know if it is possible to register Worker Plugins when connecting to a Dask Cluster? (as
    client.register_worker_plugin
    allows)
    Chris White

    Chris White

    1 year ago
    Hi Sébastien — this is definitely possible; I think the easiest place to put this hook is via a state handler on the Flow:
    def plugin_handler(f, old_state, new_state):
        if new_state.is_running(): # flow run about to begin
            # get dask client, register plugin if it doesn't exist
    and then to attach this to your flow:
    Flow("my-flow", state_handlers=[plugin_handler])
    Sébastien Arnaud

    Sébastien Arnaud

    1 year ago
    Thank you @Chris White will try this out!
    Hi @Chris White - am I missing something here?
    from distributed.diagnostics.plugin import UploadFile
    
    def plugin_handler(f, old_state, new_state):
        if new_state.is_running(): # flow run about to begin
            # get dask client, register plugin if it doesn't exist
            with worker_client() as client:
                client.register_worker_plugin(UploadFile("dist/mypkg.egg")) 
       
    
    with Flow("Test Workflow (UploadFile Plugin)", state_handlers=[plugin_handler]) as test_workflow:
        random_task()
    I end up getting:
    ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: ValueError('No workers found')
    It looks like dask is not started yet when it hits the new_state.is_running(). Any ideas?