https://prefect.io logo
m

Matt Alhonte

10/11/2021, 9:11 PM
#RubberDucking So, we're using Papermill for a lot of stuff.  I think the code that runs in Papermill isn't actually being distributed with the Dask scheduler, and is basically just being treated as one function call.  I guess the way to have the Papermill code get Dask-ified would be: Prefect flow starts in a lightweight container Prefect flow spins up a Dask cluster Modify Papermill notebooks so that they take an Address of a Dask cluster as an argument, and then submit the code that runs in them to the Cluster That sound right?
k

Kevin Kho

10/11/2021, 9:13 PM
I think so, but you may run into problems if the code in the notebooks uses some ind of parallelization itself I think.
👍 1
Ah I see your said it’ll just be one function call. Have you seen the Jupyter notebook task btw? It uses papermill under the hood.
m

Matt Alhonte

10/11/2021, 9:15 PM
Oh, sweet!
Looks useful, but I don't think that'd do it, though? Like the code in the notebook gets treated as one big function call (like the objects in it don't become Dask-ified)
k

Kevin Kho

10/11/2021, 10:07 PM
I was thinking you might be able to map across notebooks if they are sequential if the Dask workers have access to them (maybe through the provided image)
m

Matt Alhonte

10/11/2021, 10:09 PM
I can run notebooks in parallel - what I want is for the code inside the notebooks to be turned into Delayed objects and stuff.
So here's the Dask dashboard - everything that goes on in the notebook is in
run_notebook
- I'd like for the code running in there to executed by Dask directly.
k

Kevin Kho

10/11/2021, 11:35 PM
Ah. This sounds like you need to do the
client.submit()
on the notebook side right?
👍 1
m

Matt Alhonte

10/11/2021, 11:36 PM
Yeah, was thinking something along those lines!
@Kevin Kho Hrm, is there a way to get the Address of the the Dask Cluster that a Prefect flow is executing on? I tried
get_client()
and
worker_client()
but neither worked.
k

Kevin Kho

10/12/2021, 2:55 AM
What happens when you do those? Could you show me how you use them? Maybe we can try getting the address attribute of the scheduler. Will ask the team.
m

Matt Alhonte

10/12/2021, 8:57 PM
For `get_client`:
Copy code
ValueError: No global client found and no address provided
for `worker_client`:
Copy code
ValueError: No workers found
k

Kevin Kho

10/12/2021, 9:06 PM
Maybe you can try creating a flow level state handler, and then you get the flow object at the start, and log the executor address. If this is possible, then we can persist it somehow for retrieval
m

Matt Alhonte

10/12/2021, 9:18 PM
@Kevin Kho Interesting! I'd do that in the environment definition, right?
k

Kevin Kho

10/12/2021, 9:19 PM
Copy code
with Flow(..., state_handlers=[...]):
     ...
and then create a new state handler with signature
(Flow, old_state, new_state)
m

Matt Alhonte

10/12/2021, 9:22 PM
aha! Thanks!
Time to learn about State Handlers! I've only made very simple ones so far (just one for Slack notifications when something failed).
Excellent, I'll report back!
Copy code
def print_flow_object(flow, old_state, new_state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Kwargs are {flow.environment.executor_kwargs['address']}")
Huzzah!
Now, how to access this from within the flow! 😅
Yes!
Copy code
def pass_cluster_address(flow, old_state, new_state):
    prefect.context.parameters = {
        "address": flow.environment.executor_kwargs["address"]
    }
Boom!
k

Kevin Kho

10/13/2021, 2:59 AM
What!? That worked? I am surprised
m

Matt Alhonte

10/13/2021, 4:54 PM
Right???
Not sure if that's how it should work for flows with actual Params though, but we'll see!
Using a regular Dask 
client
 object connected, but the Papermill task itself blocked execution of the tasks I submitted from within it.  There's probably a way to do it, buuut in the meantime, I found defining a little Prefect flow within the Notebook and telling it to execute on the Cluster wound up working! So like having a cell in the notebook with:
executor = DaskExecutor(address, client_kwargs={"security":sec})
k

Kevin Kho

10/15/2021, 4:13 AM
Gotcha
2 Views