Pedro Martins

    Pedro Martins

    10 months ago
    Hey guys! Can anyone help me address the following error in the Local Agent:
    raise KeyError(
    KeyError: 'Task slug RunSimulationTask-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
    simulation_flow.py
        SimulationFlow = Flow(
           "RunSimulationFlowDemo-3",
            storage=Local(stored_as_script=True, path=__file__),
         )
    ----
    simulation_task.py
    class RunSimulationTask(Task):
        def __init__(
            self,
            simulation_run_parameters: SimulationRunParameters,
            simulation_gateways: SimulationGateways,
            factory_scheme_gateways: FactorySchemeGateways,
            map_gtw: MAPDataGatewayBase = None,
            **kwargs,
        ):
            super(RunSimulationTask, self).__init__(name="RunSimulationTask")
            self.simulation_run_parameters = simulation_run_parameters.to_dict()
            self.simulation_gateways = simulation_gateways
            self.factory_scheme_gateways = factory_scheme_gateways
            self.map_gtw = map_gtw or MAPDataGateway()
    
        def run(self) -> str:
            flow_run_id = prefect.context.get("flow_run_id")
            <http://self.logger.info|self.logger.info>(f"flow_run_id: {flow_run_id}")
    
            simulation_controller = SimulationController(
                **self.factory_scheme_gateways.to_dict(),
                **self.simulation_gateways.to_dict(),
                map_gtw=self.map_gtw,
            )
    
            <http://self.logger.info|self.logger.info>(f"Run simulation task. [{self.simulation_run_parameters}]")
            simulation_controller.run(
                **self.simulation_run_parameters, flow_run_id=flow_run_id
            )
    
            return flow_run_id
    
    ----
    simulation_manager.py
    
            task = RunSimulationTask(
                simulation_run_parameters=simulation_run_parameters,
                simulation_gateways=self._simulation_gateways,
                factory_scheme_gateways=self._factory_scheme_gateways,
                map_gtw=self._map_gtw,
            )
            SimulationFlow.add_task(task)
    
            SimulationFlow.run_config = self.flow_config.run_config
            SimulationFlow.executor = self.flow_config.executor
            SimulationFlow.state_handlers = self.flow_config.state_handlers
    I’m trying to register and run this flow using the
    client.register()
    and
    client.create_flow_run()
    . One weird thing I noticed is that the Flow is not being saved to my local
    ~/.prefect/flows
    directory. Could be that?
    Anna Geller

    Anna Geller

    10 months ago
    Since you defined an explicit path for your Local storage, it will be stored and used from there, rather than from ~/.prefect/flows:
    storage=Local(stored_as_script=True, path=__file__),
    @Pedro Martins and regarding the error, I think the error message gives us some really good clues to check:
    1) Did you change the flow without re-registering it?
    2) Did you register the flow without updating it in your storage location (if applicable)?
    What is your run configuration? Do you always run it on a local agent deployed on your development machine?
    Pedro Martins

    Pedro Martins

    10 months ago
    Hey @Anna Geller! The path I set is the direct path to the location of the flow file and not the directory. The directory the flows will be stored in should be
    ~/.prefect/flows
    as per documentation.
    This is the RunConfiguration.
    run_config: RunConfig = LocalRun(
        env={
            "PYTHONPATH": "api:simulation:shared",
            "DB_CONNECTION": "connection-string",
            "MAP_DB_CONNECTION": "connection-string",
        }
    )
    1. Every time I run the code I register the flow. 2. How to make sure the local storage is always update?
    Anna Geller

    Anna Geller

    10 months ago
    There are a couple of things to discuss here:1. Why do you register a new version every time you run the flow? Reregistration is only required if some metadata such as storage, run config and your tasks and edges changed since the last registration 2. As long as you use stored_as_script=True, your agent should pick up the flow file from the path that you specify. In general, every time you register, by default you always build the storage unless you set
    flow.register(build=False)
    3. You run configuration has no labels - this is a bit dangerous, I would recommend to add a label on the local agent and on the flow explicitly to avoid any issues 4. The path ~/.prefect/flows is used when you use pickled storage, but you use stored_as_script=True. This docstring should clarify the confusion: https://docs.prefect.io/api/latest/storage.html#local