Hey all! I've been experimenting with Prefect for ...
# ask-community
m
Hey all! I've been experimenting with Prefect for a while now, but am running into the following problem (see error). My flow repeatedly retrieves data from a database and writes it to Azure. After running for about 40 minutes, the whole flow crashes and I honestly don't understand why... In the error log, I see a reference to my ‘load_state_variable’ function that looks like this:
Copy code
@task
async def load_state_variable(state_variable_name: str, date_starttime: datetime, log):
    state_variable = await Variable.get(state_variable_name)
    if state_variable is None:
        log.info("State variable not found, creating a new one")
        state = {
            "starttime": date_starttime.isoformat(),
        }
        state_json = json.dumps(state)
        state_variable = await Variable.set(name=state_variable_name, value=state_json)
    else:
        log.info("state variable found")

    # converting for serializing
    state_value = json.loads(state_variable.value)
    state_value["starttime"] = datetime.fromisoformat(state_value["starttime"])
    return state_value
what could I be doing wrong? Or how can I avoid this? Thanks so much for the help!!!
n
hi @Michiel - could you please keep long tracebacks in the thread? thank you! this error means prefect thinks a given result lives in a spot on disk, when it doesn’t. most commonly: • flow runs on container. result written to “local” storage (ie that container) • flow run ends, container dies • next flow run on new container, tries to fetch cached result but result was saved to path on container that already died, so path does not exist
m
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 893, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/opt/prefect/capture.prefect.data-exporter/data_exporter.py", line 340, in export_flow
    state_variable = await load_state_variable(state_variable_name, date_starttime, log)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1604, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 98, in _get_state_result
    result = await state.data.get()
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/results.py", line 630, in get
    blob = await self._read_blob(client=client)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/results.py", line 645, in _read_blob
    content = await storage_block.read_path(self.storage_key)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/filesystems.py", line 220, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/0fddd8f466d344aca9670a3d7340999a does not exist.