I’m trying to understand why my flow keeps crashin...
# prefect-community
I’m trying to understand why my flow keeps crashing partway thorugh, at a seemingly consistent spot. I have a parent flow that runs the first subflow fine, but half way through the second subflow the parent fails with the exception posted in the thread. The UI still shows the second subflow as running. This is on EKS by the way. Locally things look to run until completion.
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 587, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "activity_processor/flow/parent.py", line 43, in api_activity_processor
    process_hourly_api(normalized_process_time, dry_run, validate_results)
  File "/usr/local/lib/python3.9/site-packages/prefect/flows.py", line 384, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 160, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 435, in create_and_begin_subflow_run
    flow_run.state.data._cache_data(await _retrieve_result(flow_run.state))
  File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 38, in _retrieve_result
    serialized_result = await _retrieve_serialized_result(state.data)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/results.py", line 34, in _retrieve_serialized_result
    return await filesystem.read_path(result.key)
  File "/usr/local/lib/python3.9/site-packages/prefect/filesystems.py", line 149, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/4762715b441f4b3c8011b92dc1d5361f does not exist.
It looks like your subflow is attempting to retrieve its state from prior run instead of creating a new run
we can see that it attempts to do
flow_run.state.data._cache_data(await _retrieve_result(flow_run.state))
— this call is rehydrating the result on the state
tldr; it looks like the second subflow thinks it is the first one. We’ll need a minimal example to fix this.
I think I see what might be causing some of this. Looks like the pod runs out of memory.
Is there any way to understand the memory profile of a flow? It jumps as high as 60 GB for something that should be pretty benign. 😕
Is it possible that all of your tasks together are consuming that much memory? We do not release memory eagerly yet
As in when a task finishes the memory isn’t released until the flow completes?
For tasks that are submitted as futures, no. We can’t know if you’ll need the result of the future downstream.
We might also hold onto the data for normal task calls to as we track the state of all task runs created in a flow. With upcoming work on result handling, we can optimize that though.
Things get better if I don’t submit any tasks as futures, but we’re still getting up to ~37GB. I also don’t see any real drop in memory between when each subflow finishes (blue lines are where a new subflow starts).
Are you returning a value from your subflow? If not, we default to a state that bundles all the task states from within the subflow
No returns, so sounds like I should?
Yeah that should help since it'll release all those tasks on completion.
That definitely works to clear out some resources between the subflows. The biggest hog definitely looks like the futures. Is there any upcoming work to optimize that?
Yeah there is, I'll be working on result handling over the next month.
💚 1