Joe Schmid
04/24/2020, 6:16 PMCloudFlowRunner
):
def execute( # type: ignore
self, storage: "Storage", flow_location: str, **kwargs: Any # type: ignore
) -> None:
flow_run_id = prefect.context.get("flow_run_id")
try:
flow_run_info = self.client.get_flow_run_info(flow_run_id)
except Exception as exc:
self.logger.debug(
"Failed to retrieve flow state with error: {}".format(repr(exc))
)
if state is None:
state = Failed(
message="Could not retrieve state from Prefect Cloud", result=exc
)
raise ENDRUN(state=state)
updated_parameters = flow_run_info.parameters or {} # type: ignore
Is there a better way to do this than fetching the flow run info?josh
04/24/2020, 6:20 PMJoe Schmid
04/24/2020, 6:22 PMDaskCloudProviderEnvironment
could provide a Callable
, allowing them to examine the flow run parameters, and then make adjustments to dask cluster resources (# workers, cpu/mem, etc.) to size the cluster appropriately right before the flow executes.josh
04/24/2020, 6:25 PMBrett Naul
04/24/2020, 6:26 PMhelm install --set ...
command that starts our cluster. felt a little gross/questionable, dunno if there's value in a distinction btwn parameters/meta-parameters like theseJoe Schmid
04/24/2020, 6:27 PMDaskCloudProviderEnvironment
I'm making either #1 or #2 happen just prior to flow run execution."do you know if it works similar to the DaskK8s project where it adjusts based on the amount of work that needs to done"With adaptive mode, yes.