#RubberDucking So, we're using Papermill for a lot...
# ask-community
m
#RubberDucking So, we're using Papermill for a lot of stuff.  I think the code that runs in Papermill isn't actually being distributed with the Dask scheduler, and is basically just being treated as one function call.  I guess the way to have the Papermill code get Dask-ified would be: Prefect flow starts in a lightweight container Prefect flow spins up a Dask cluster Modify Papermill notebooks so that they take an Address of a Dask cluster as an argument, and then submit the code that runs in them to the Cluster That sound right?
k
I think so, but you may run into problems if the code in the notebooks uses some ind of parallelization itself I think.
👍 1
Ah I see your said it’ll just be one function call. Have you seen the Jupyter notebook task btw? It uses papermill under the hood.
m
Oh, sweet!
Looks useful, but I don't think that'd do it, though? Like the code in the notebook gets treated as one big function call (like the objects in it don't become Dask-ified)
k
I was thinking you might be able to map across notebooks if they are sequential if the Dask workers have access to them (maybe through the provided image)
m
I can run notebooks in parallel - what I want is for the code inside the notebooks to be turned into Delayed objects and stuff.
So here's the Dask dashboard - everything that goes on in the notebook is in
run_notebook
- I'd like for the code running in there to executed by Dask directly.
k
Ah. This sounds like you need to do the
client.submit()
on the notebook side right?
👍 1
m
Yeah, was thinking something along those lines!
@Kevin Kho Hrm, is there a way to get the Address of the the Dask Cluster that a Prefect flow is executing on? I tried
get_client()
and
worker_client()
but neither worked.
k
What happens when you do those? Could you show me how you use them? Maybe we can try getting the address attribute of the scheduler. Will ask the team.
m
For `get_client`:
Copy code
ValueError: No global client found and no address provided
for `worker_client`:
Copy code
ValueError: No workers found
k
Maybe you can try creating a flow level state handler, and then you get the flow object at the start, and log the executor address. If this is possible, then we can persist it somehow for retrieval
m
@Kevin Kho Interesting! I'd do that in the environment definition, right?
k
Copy code
with Flow(..., state_handlers=[...]):
     ...
and then create a new state handler with signature
(Flow, old_state, new_state)
m
aha! Thanks!
Time to learn about State Handlers! I've only made very simple ones so far (just one for Slack notifications when something failed).
Excellent, I'll report back!
Copy code
def print_flow_object(flow, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Kwargs are {flow.environment.executor_kwargs['address']}")
Huzzah!
Now, how to access this from within the flow! 😅
Yes!
Copy code
def pass_cluster_address(flow, old_state, new_state):
    prefect.context.parameters = {
        "address": flow.environment.executor_kwargs["address"]
    }
Boom!
k
What!? That worked? I am surprised
m
Right???
Not sure if that's how it should work for flows with actual Params though, but we'll see!
Using a regular Dask 
client
 object connected, but the Papermill task itself blocked execution of the tasks I submitted from within it.  There's probably a way to do it, buuut in the meantime, I found defining a little Prefect flow within the Notebook and telling it to execute on the Cluster wound up working! So like having a cell in the notebook with:
executor = DaskExecutor(address, client_kwargs={"security":sec})
k
Gotcha