Michiel
09/12/2024, 12:17 PM@task
async def load_state_variable(state_variable_name: str, date_starttime: datetime, log):
state_variable = await Variable.get(state_variable_name)
if state_variable is None:
log.info("State variable not found, creating a new one")
state = {
"starttime": date_starttime.isoformat(),
}
state_json = json.dumps(state)
state_variable = await Variable.set(name=state_variable_name, value=state_json)
else:
log.info("state variable found")
# converting for serializing
state_value = json.loads(state_variable.value)
state_value["starttime"] = datetime.fromisoformat(state_value["starttime"])
return state_value
what could I be doing wrong? Or how can I avoid this?
Thanks so much for the help!!!Nate
09/12/2024, 1:53 PMMichiel
09/12/2024, 1:58 PMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 893, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/opt/prefect/capture.prefect.data-exporter/data_exporter.py", line 340, in export_flow
state_variable = await load_state_variable(state_variable_name, date_starttime, log)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 150, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1604, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 98, in _get_state_result
result = await state.data.get()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/results.py", line 630, in get
blob = await self._read_blob(client=client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/results.py", line 645, in _read_blob
content = await storage_block.read_path(self.storage_key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/filesystems.py", line 220, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/0fddd8f466d344aca9670a3d7340999a does not exist.