https://prefect.io logo
Title
j

Joe Schmid

04/24/2020, 6:16 PM
Kinda low-level question on Environments. If I want to access the parameters for a flow run in an Environment, it looks like I could do the following in the Environment's `execute()`: (taken from
CloudFlowRunner
):
def execute(  # type: ignore
        self, storage: "Storage", flow_location: str, **kwargs: Any  # type: ignore
    ) -> None:
        flow_run_id = prefect.context.get("flow_run_id")

        try:
            flow_run_info = self.client.get_flow_run_info(flow_run_id)
            except Exception as exc:
            self.logger.debug(
                "Failed to retrieve flow state with error: {}".format(repr(exc))
            )
            if state is None:
                state = Failed(
                    message="Could not retrieve state from Prefect Cloud", result=exc
                )
            raise ENDRUN(state=state)
        updated_parameters = flow_run_info.parameters or {}  # type: ignore
Is there a better way to do this than fetching the flow run info?
j

josh

04/24/2020, 6:20 PM
Not a better way I know of, I think this is correct. The environment essentially “lives” outside of the flow run so it would have to query for this information since it’s not readily available in context at that point
j

Joe Schmid

04/24/2020, 6:22 PM
Thanks @josh. That makes sense and should be fine. I have an idea I'm really excited about where a user of
DaskCloudProviderEnvironment
could provide a
Callable
, allowing them to examine the flow run parameters, and then make adjustments to dask cluster resources (# workers, cpu/mem, etc.) to size the cluster appropriately right before the flow executes.
🚀 1
i.e. if i'm mapping over 1,000 elements in the flow run parameters i'd like more workers than if I'm mapping over 10, etc.
j

josh

04/24/2020, 6:25 PM
How does dask cloud provider handle autoscaling in terms of knowing when to scale up towards max workers? Like do you know if it works similar to the DaskK8s project where it adjusts based on the amount of work that needs to done
b

Brett Naul

04/24/2020, 6:26 PM
I did something similar recently fwiw. in our case it's basically a dict of override params that get forwarded along to the
helm install --set ...
command that starts our cluster. felt a little gross/questionable, dunno if there's value in a distinction btwn parameters/meta-parameters like these
👍 1
j

Joe Schmid

04/24/2020, 6:27 PM
@josh DCP just gives you the usual control of: 1. manually set workers any time with cluser.scale(n) 2. adaptive mode with a min & max (# workers controlled by dask scheduler) For our purposes in
DaskCloudProviderEnvironment
I'm making either #1 or #2 happen just prior to flow run execution.
(And for that matter I'm also creating the entire Dask cluster dynamically just prior to flow run execution)
@josh
"do you know if it works similar to the DaskK8s project where it adjusts based on the amount of work that needs to done"
With adaptive mode, yes.
My sense is that adaptive mode often does not do a fantastic job of sizing, especially when worker startup time is relatively long (like with Fargate)