Joe Schmid
04/24/2020, 6:16 PMCloudFlowRunner
):
def execute( # type: ignore
self, storage: "Storage", flow_location: str, **kwargs: Any # type: ignore
) -> None:
flow_run_id = prefect.context.get("flow_run_id")
try:
flow_run_info = self.client.get_flow_run_info(flow_run_id)
except Exception as exc:
self.logger.debug(
"Failed to retrieve flow state with error: {}".format(repr(exc))
)
if state is None:
state = Failed(
message="Could not retrieve state from Prefect Cloud", result=exc
)
raise ENDRUN(state=state)
updated_parameters = flow_run_info.parameters or {} # type: ignore
Is there a better way to do this than fetching the flow run info?josh
04/24/2020, 6:20 PMJoe Schmid
04/24/2020, 6:22 PMDaskCloudProviderEnvironment
could provide a Callable
, allowing them to examine the flow run parameters, and then make adjustments to dask cluster resources (# workers, cpu/mem, etc.) to size the cluster appropriately right before the flow executes.Joe Schmid
04/24/2020, 6:24 PMjosh
04/24/2020, 6:25 PMBrett Naul
04/24/2020, 6:26 PMhelm install --set ...
command that starts our cluster. felt a little gross/questionable, dunno if there's value in a distinction btwn parameters/meta-parameters like theseJoe Schmid
04/24/2020, 6:27 PMDaskCloudProviderEnvironment
I'm making either #1 or #2 happen just prior to flow run execution.Joe Schmid
04/24/2020, 6:28 PMJoe Schmid
04/24/2020, 6:29 PM"do you know if it works similar to the DaskK8s project where it adjusts based on the amount of work that needs to done"With adaptive mode, yes.
Joe Schmid
04/24/2020, 6:30 PM