Thread
#prefect-community
    m

    Matt Alhonte

    11 months ago
    #RubberDucking So, we're using Papermill for a lot of stuff.  I think the code that runs in Papermill isn't actually being distributed with the Dask scheduler, and is basically just being treated as one function call.  I guess the way to have the Papermill code get Dask-ified would be: Prefect flow starts in a lightweight container Prefect flow spins up a Dask cluster Modify Papermill notebooks so that they take an Address of a Dask cluster as an argument, and then submit the code that runs in them to the Cluster That sound right?
    Kevin Kho

    Kevin Kho

    11 months ago
    I think so, but you may run into problems if the code in the notebooks uses some ind of parallelization itself I think.
    Ah I see your said it’ll just be one function call. Have you seen the Jupyter notebook task btw? It uses papermill under the hood.
    m

    Matt Alhonte

    11 months ago
    Oh, sweet!
    Looks useful, but I don't think that'd do it, though? Like the code in the notebook gets treated as one big function call (like the objects in it don't become Dask-ified)
    Kevin Kho

    Kevin Kho

    11 months ago
    I was thinking you might be able to map across notebooks if they are sequential if the Dask workers have access to them (maybe through the provided image)
    m

    Matt Alhonte

    11 months ago
    I can run notebooks in parallel - what I want is for the code inside the notebooks to be turned into Delayed objects and stuff.
    So here's the Dask dashboard - everything that goes on in the notebook is in
    run_notebook
    - I'd like for the code running in there to executed by Dask directly.
    Kevin Kho

    Kevin Kho

    11 months ago
    Ah. This sounds like you need to do the
    client.submit()
    on the notebook side right?
    m

    Matt Alhonte

    11 months ago
    Yeah, was thinking something along those lines!
    @Kevin Kho Hrm, is there a way to get the Address of the the Dask Cluster that a Prefect flow is executing on? I tried
    get_client()
    and
    worker_client()
    but neither worked.
    Kevin Kho

    Kevin Kho

    11 months ago
    What happens when you do those? Could you show me how you use them? Maybe we can try getting the address attribute of the scheduler. Will ask the team.
    m

    Matt Alhonte

    11 months ago
    For get_client:
    ValueError: No global client found and no address provided
    for worker_client:
    ValueError: No workers found
    Kevin Kho

    Kevin Kho

    11 months ago
    Maybe you can try creating a flow level state handler, and then you get the flow object at the start, and log the executor address. If this is possible, then we can persist it somehow for retrieval
    m

    Matt Alhonte

    11 months ago
    @Kevin Kho Interesting! I'd do that in the environment definition, right?
    Kevin Kho

    Kevin Kho

    11 months ago
    with Flow(..., state_handlers=[...]):
         ...
    and then create a new state handler with signature
    (Flow, old_state, new_state)
    m

    Matt Alhonte

    11 months ago
    aha! Thanks!
    Time to learn about State Handlers! I've only made very simple ones so far (just one for Slack notifications when something failed).
    Excellent, I'll report back!
    def print_flow_object(flow, old_state, new_state):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Kwargs are {flow.environment.executor_kwargs['address']}")
    Huzzah!
    Now, how to access this from within the flow! 😅
    Yes!
    def pass_cluster_address(flow, old_state, new_state):
        prefect.context.parameters = {
            "address": flow.environment.executor_kwargs["address"]
        }
    Boom!
    Kevin Kho

    Kevin Kho

    11 months ago
    What!? That worked? I am surprised
    m

    Matt Alhonte

    11 months ago
    Right???
    Not sure if that's how it should work for flows with actual Params though, but we'll see!
    Using a regular Dask 
    client
     object connected, but the Papermill task itself blocked execution of the tasks I submitted from within it.  There's probably a way to do it, buuut in the meantime, I found defining a little Prefect flow within the Notebook and telling it to execute on the Cluster wound up working! So like having a cell in the notebook with:
    executor = DaskExecutor(address, client_kwargs={"security":sec})
    Kevin Kho

    Kevin Kho

    11 months ago
    Gotcha