Hi, does anybody know if it is possible to register Worker Plugins when connecting to a Dask Cluster? (as
client.register_worker_plugin
allows)
c
Chris White
05/20/2021, 4:17 AM
Hi Sébastien — this is definitely possible; I think the easiest place to put this hook is via a state handler on the Flow:
Copy code
def plugin_handler(f, old_state, new_state):
if new_state.is_running(): # flow run about to begin
# get dask client, register plugin if it doesn't exist
and then to attach this to your flow:
Copy code
Flow("my-flow", state_handlers=[plugin_handler])
s
Sébastien Arnaud
05/20/2021, 12:37 PM
Thank you @Chris White will try this out!
👍 1
Sébastien Arnaud
05/21/2021, 12:47 AM
Hi @Chris White - am I missing something here?
Copy code
from distributed.diagnostics.plugin import UploadFile
def plugin_handler(f, old_state, new_state):
if new_state.is_running(): # flow run about to begin
# get dask client, register plugin if it doesn't exist
with worker_client() as client:
client.register_worker_plugin(UploadFile("dist/mypkg.egg"))
with Flow("Test Workflow (UploadFile Plugin)", state_handlers=[plugin_handler]) as test_workflow:
random_task()
I end up getting:
Copy code
ERROR - prefect.FlowRunner | Unexpected error while calling state handlers: ValueError('No workers found')
It looks like dask is not started yet when it hits the new_state.is_running(). Any ideas?
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.