Pedro Martins
11/15/2021, 11:57 AMraise KeyError(
KeyError: 'Task slug RunSimulationTask-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
simulation_flow.py
SimulationFlow = Flow(
"RunSimulationFlowDemo-3",
storage=Local(stored_as_script=True, path=__file__),
)
----
simulation_task.py
class RunSimulationTask(Task):
def __init__(
self,
simulation_run_parameters: SimulationRunParameters,
simulation_gateways: SimulationGateways,
factory_scheme_gateways: FactorySchemeGateways,
map_gtw: MAPDataGatewayBase = None,
**kwargs,
):
super(RunSimulationTask, self).__init__(name="RunSimulationTask")
self.simulation_run_parameters = simulation_run_parameters.to_dict()
self.simulation_gateways = simulation_gateways
self.factory_scheme_gateways = factory_scheme_gateways
self.map_gtw = map_gtw or MAPDataGateway()
def run(self) -> str:
flow_run_id = prefect.context.get("flow_run_id")
<http://self.logger.info|self.logger.info>(f"flow_run_id: {flow_run_id}")
simulation_controller = SimulationController(
**self.factory_scheme_gateways.to_dict(),
**self.simulation_gateways.to_dict(),
map_gtw=self.map_gtw,
)
<http://self.logger.info|self.logger.info>(f"Run simulation task. [{self.simulation_run_parameters}]")
simulation_controller.run(
**self.simulation_run_parameters, flow_run_id=flow_run_id
)
return flow_run_id
----
simulation_manager.py
task = RunSimulationTask(
simulation_run_parameters=simulation_run_parameters,
simulation_gateways=self._simulation_gateways,
factory_scheme_gateways=self._factory_scheme_gateways,
map_gtw=self._map_gtw,
)
SimulationFlow.add_task(task)
SimulationFlow.run_config = self.flow_config.run_config
SimulationFlow.executor = self.flow_config.executor
SimulationFlow.state_handlers = self.flow_config.state_handlers
I’m trying to register and run this flow using the client.register()
and client.create_flow_run()
.
One weird thing I noticed is that the Flow is not being saved to my local ~/.prefect/flows
directory. Could be that?Anna Geller
11/15/2021, 12:14 PMstorage=Local(stored_as_script=True, path=__file__),
1) Did you change the flow without re-registering it?
2) Did you register the flow without updating it in your storage location (if applicable)?
What is your run configuration? Do you always run it on a local agent deployed on your development machine?Pedro Martins
11/15/2021, 12:54 PM~/.prefect/flows
as per documentation.run_config: RunConfig = LocalRun(
env={
"PYTHONPATH": "api:simulation:shared",
"DB_CONNECTION": "connection-string",
"MAP_DB_CONNECTION": "connection-string",
}
)
Anna Geller
11/15/2021, 1:09 PMflow.register(build=False)
3. You run configuration has no labels - this is a bit dangerous, I would recommend to add a label on the local agent and on the flow explicitly to avoid any issues
4. The path ~/.prefect/flows is used when you use pickled storage, but you use stored_as_script=True. This docstring should clarify the confusion: https://docs.prefect.io/api/latest/storage.html#local