Hey guys! Can anyone help me address the following...
# prefect-server
p
Hey guys! Can anyone help me address the following error in the Local Agent:
Copy code
raise KeyError(
KeyError: 'Task slug RunSimulationTask-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
Copy code
simulation_flow.py
    SimulationFlow = Flow(
       "RunSimulationFlowDemo-3",
        storage=Local(stored_as_script=True, path=__file__),
     )
----
simulation_task.py
class RunSimulationTask(Task):
    def __init__(
        self,
        simulation_run_parameters: SimulationRunParameters,
        simulation_gateways: SimulationGateways,
        factory_scheme_gateways: FactorySchemeGateways,
        map_gtw: MAPDataGatewayBase = None,
        **kwargs,
    ):
        super(RunSimulationTask, self).__init__(name="RunSimulationTask")
        self.simulation_run_parameters = simulation_run_parameters.to_dict()
        self.simulation_gateways = simulation_gateways
        self.factory_scheme_gateways = factory_scheme_gateways
        self.map_gtw = map_gtw or MAPDataGateway()

    def run(self) -> str:
        flow_run_id = prefect.context.get("flow_run_id")
        <http://self.logger.info|self.logger.info>(f"flow_run_id: {flow_run_id}")

        simulation_controller = SimulationController(
            **self.factory_scheme_gateways.to_dict(),
            **self.simulation_gateways.to_dict(),
            map_gtw=self.map_gtw,
        )

        <http://self.logger.info|self.logger.info>(f"Run simulation task. [{self.simulation_run_parameters}]")
        simulation_controller.run(
            **self.simulation_run_parameters, flow_run_id=flow_run_id
        )

        return flow_run_id

----
simulation_manager.py

        task = RunSimulationTask(
            simulation_run_parameters=simulation_run_parameters,
            simulation_gateways=self._simulation_gateways,
            factory_scheme_gateways=self._factory_scheme_gateways,
            map_gtw=self._map_gtw,
        )
        SimulationFlow.add_task(task)

        SimulationFlow.run_config = self.flow_config.run_config
        SimulationFlow.executor = self.flow_config.executor
        SimulationFlow.state_handlers = self.flow_config.state_handlers
I’m trying to register and run this flow using the
client.register()
and
client.create_flow_run()
. One weird thing I noticed is that the Flow is not being saved to my local
~/.prefect/flows
directory. Could be that?
a
Since you defined an explicit path for your Local storage, it will be stored and used from there, rather than from ~/.prefect/flows:
Copy code
storage=Local(stored_as_script=True, path=__file__),
@Pedro Martins and regarding the error, I think the error message gives us some really good clues to check:
Copy code
1) Did you change the flow without re-registering it?
2) Did you register the flow without updating it in your storage location (if applicable)?
What is your run configuration? Do you always run it on a local agent deployed on your development machine?
p
Hey @Anna Geller! The path I set is the direct path to the location of the flow file and not the directory. The directory the flows will be stored in should be
~/.prefect/flows
as per documentation.
This is the RunConfiguration.
Copy code
run_config: RunConfig = LocalRun(
    env={
        "PYTHONPATH": "api:simulation:shared",
        "DB_CONNECTION": "connection-string",
        "MAP_DB_CONNECTION": "connection-string",
    }
)
1. Every time I run the code I register the flow. 2. How to make sure the local storage is always update?
a
There are a couple of things to discuss here: 1. Why do you register a new version every time you run the flow? Reregistration is only required if some metadata such as storage, run config and your tasks and edges changed since the last registration 2. As long as you use stored_as_script=True, your agent should pick up the flow file from the path that you specify. In general, every time you register, by default you always build the storage unless you set
flow.register(build=False)
3. You run configuration has no labels - this is a bit dangerous, I would recommend to add a label on the local agent and on the flow explicitly to avoid any issues 4. The path ~/.prefect/flows is used when you use pickled storage, but you use stored_as_script=True. This docstring should clarify the confusion: https://docs.prefect.io/api/latest/storage.html#local
👀 1