Hello! We had four deployments fail last night wit...
# ask-community
i
Hello! We had four deployments fail last night with the same error a few hours into their runs (see below). All four deployments were long-running (2+ hours) and had a large number of tasks (3.5k, 3.4k, 1.1k, and 675 respectively). These deployments were part of two different flows with two very different sets of code. One of these flows is running Prefect 2.7.5, the other is running 2.6.7. Our agent is running 2.7.5 and we are using Prefect Cloud. Any insights as to what may have happened and how to fix this issue?
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
(full traceback in thread)
Copy code
Encountered exception during execution:
Traceback (most recent call last):
 File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
  result = await run_sync(flow_call)
 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
  return await anyio.to_thread.run_sync(
 File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
  return await get_asynclib().run_sync_in_worker_thread(
 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
  return await future
 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
  result = context.run(func, *args)
 File "/opt/prefect/flow.py", line 160, in entrypoint
  results.append(future.result())
 File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 226, in result
  return sync(
 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 267, in sync
  return run_async_from_worker_thread(__async_fn, *args, **kwargs)
 File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
  return anyio.from_thread.run(call)
 File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
  return asynclib.run_async_from_thread(func, *args)
 File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
  return f.result()
 File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
  return self.__get_result()
 File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
  raise self._exception
 File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
  return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
 File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
  raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
k
Hi @Ilya Galperin, There was a bug in 2.7.5 that causes this error. I recommend updating immediately to 2.7.6.
i
Hi @Kalise Richmond - this is only happening intermittently on flows and we are not using Prefect Orion using SQLite - we are on Prefect Cloud (adding clarification to my original message). It doesn’t seem like the bug linked to 2.7.5 is related?
🤔 1
z
This error occurs after the flow run finishes?
These deployments usually run without problems?
i
It looks like they were surfaced at the end of the flow runs. One of flows (flow a) (a ~2.5 hour one) usually runs without problems, and the other flow (flow b) with 3 deployments is expected to run >4 hours per deployment. Flow b runs fine on other deployments (using the same code) when it has shorter runtimes — we’re re-trying these long-running deployments now.
For “flow a” though, it is using dask and the flow will fail in the middle of the flow before it reaches its final set of tasks. After all the dask tasks finish during which the error happened, the flow re-surfaces the error and doesn’t trigger any remaining downstream tasks.
z
Can you share timestamps for the failures? Are you using result persistence?
i
For flow b: 110600 PM 110545 PM 110650 PM Flow a: 105127
I don’t think we’re persisting any results and we do not have retries enabled.
Unless result persistence is enabled somewhere by default that I’m not sure of?
z
It’s not often automatically enabled at the flow level
What timezone are those?
i
Sorry these are PST from last night
Jan 5th
z
🙂 thanks
i
Happy to give you run IDs if that helps at all as well
z
Yeah that’d be helpful for identifiying if it’s a server-sie issue
I suspect this is a client-side bug though, and it’s going to be hard to find without a minimal example that reproduces the problem.
i
Copy code
bad5b6dd-8f0f-428c-a045-e95cf72da4cd
82b0ca57-1501-47a2-91c8-ed621f184e82
646d0c66-320a-4435-9fb2-e0f4d03bb547
012ae57c-9296-46d0-af83-16eafc1218e0
z
Thanks! I’ll forward this to a Cloud engineer and we’ll rule out server errors.
Can you explain more about what’s going on here?
Copy code
File "/opt/prefect/flow.py", line 160, in entrypoint
  results.append(future.result())
i
Sure, this is doing something like…
Copy code
futures = []
    for e, x in enumerate(mylist):
        futures.append(
            sometask.with_options(name=task_name).submit(
                x=x,
                y=y,
                z=z,
            )
        )
    results = []
    for future in futures:
        results.append(future.result())
So it is submitting a number of tasks to dask, then waiting for them to complete
Before moving to the next step
This is only happening in flow B, flow A is not using Dask (the traceback might actually be slightly different let me check…)
Here is the traceback for flow A — different line in flow.py
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/flow.py", line 228, in entrypoint
    temp_table_created = create_temp_table(
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 436, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 927, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
And another run in Flow A - also a different line in flow.py
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/flow.py", line 279, in entrypoint
    cols, _ = get_table_schema(
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 436, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 927, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
z
Do you have in-memory storage of task results disabled?
i
Hmm I am not sure - how do I check that?
z
It’d be
Copy code
cache_result_in_memory=False
i
I wouldn’t think so since we’re definitely storing some results in memory between tasks
z
Are you using cache keys?
i
We are not
z
😄
So I suspect the problem is that the results are being dropped from memory for some reason. Perhaps on the way back from a Dask worker.
i
Maybe - only one of the flows is using Dask though, 3 of the failures are just running the SequentialTaskRunner
Not sure if it’s relevant but the Dask one has never failed (with this error) before last night when we had all of these deployments running simultaneously - could it maybe have something to do with high load against the agent or API?
All of these deployments are running on different k8s pods as well (so 4 pods total between these 4 runs)
z
Is that latest traceback from the sequential task runner?
i
Ya, the first log was from the flow running DaskTaskRunner
The later 2 are from the default task runner (not defined in the flow) which I believe is just sequential
z
If you’re using
.submit
the default is concurrent.
Or are you just calling the tasks?
i
The first one we have explicitly called dasktaskrunner
the other flow doens’t use .submit
it just calls them directly
z
🤯 okay that’s really weird
Are your dependencies pinned or could a version of a package changed?
i.e. could pydantic have changed versions or something?
i
They are all pinned
z
I’m out of questions, I think. This is very peculiar.
If you can create a minimal example that reproduces it, we can debug further but I’m not sure what would cause this
i
Hmm, I can try but it might be tough. This behavior only seems to be happening when we’re running thousands of tasks simultaneously — possibly across flows in a long-running manner.
Does it sound like there were no issues with the API/cloud services around this time last night?
z
We don’t see anything suspicious
Can you take a look at these three task runs?
Copy code
9bf44e45-bcc6-413b-a67e-0ee8a1b06c77 at 02:06:30.633 EST
0424b8f8-06ff-4f78-808d-13543167632f at 02:06:00.680 EST
3ae5277e-61f1-4a10-9ff3-cf89af740aa7 at 02:05:45.138 EST
• Are they in the affected flow runs? • What state transitions did they go through? • Are these the task runs your sequential flows crashed on?
i
Let me check
Yep, all of them are in the affected flow runs
These are all the sequential flows and all the tasks are still in pending
The actual error isn’t surfaced on the task level though, it’s attached to the flow run
But it seems like around the time the flow crashed yes
Let me confirm but I think the tracebacks I posted above those were the tasks called in flow.py at the time..
Yep, confirming that
z
Suspicious 🙂
Well, if these are all in PENDING states that explains the error! A pending state cannot have a result attached to it.
Are you passing data from upstream tasks into these?
i
Let me see.. all 3 of these are different tasks in the same flow 😉
It looks like 2/3 are. The one with id
3ae5277e-61f1-4a10-9ff3-cf89af740aa7
only uses values that are passed in as arguments to the flow function.
z
Hm okay. Are there logs for these tasks runs? Presuming there aren’t DEBUG level logs 😢
i
The other 2 are using values that are captured from a previous task then stored in a var in flow.py
Here’s all the logs I see..
Copy code
3ae5277e-61f1-4a10-9ff3-cf89af740aa7
Task run '3ae5277e-61f1-4a10-9ff3-cf89af740aa7' already finished.

9bf44e45-bcc6-413b-a67e-0ee8a1b06c77
Task run '9bf44e45-bcc6-413b-a67e-0ee8a1b06c77' already finished.

9bf44e45-bcc6-413b-a67e-0ee8a1b06c77
Task run '9bf44e45-bcc6-413b-a67e-0ee8a1b06c77' already finished.
All INFO level
i
Not sure if it matters but the Dask flow does not have any tasks still remaining in pending state (or failed or crashed for that matter)
z
It looks like the server sent back an ABORT instruction here
i
Ah interesting..
z
That generally means the server thought the task was already being executed elsewhere
Unfortunately I’m not sure we can recover the “reason” attached to the abort response.
i
Hmm
z
I can add logs so we get more information in the future
🙏 1
i
How do you recommend we proceed for now?
z
There’s not much to do in the meantime. You can use
return_state=True
on the task call and just like.. call it again if you happen to get a pending state back, but that’s pretty hacky.
i
Hmm ya, I don’t know if that would solve the issue with the Dask flow though? That one is already returning just futures
And there is no task stuck in pending there 🤷
z
Yeah I’m not sure what’s going on there yet either. I’ll add more logs in the next release and then I’d recommend running this with DEBUG logs.
i
Will do, thanks MIchael.
z
I think once we get a better picture of what’s going on it’ll be an easy enough fix
i
Do we need to do anything to enable another level of debug logs? I thought cloud captures debug level logs too
Or do we need to set some env var on the flow container?
z
PREFECT_LOGGING_LEVEL=DEBUG
— we only capture what you set the level to
i
Our Prefect logger debug level logs are captured
z
Oh okay yeah there just aren’t debug level logs in this spot yet.
i
Gotcha
Oh duh I set that env var in the docker image for these flows 😛
Will keep up with the next release, thanks again
I’ll let you know if we’re able to reproduce
Thanks again for all the help troubleshooting this
🙏 1
z
Thank you too!
🙏 1
I’m sure you’ve caught a currently obscure bug that will become more common as people scale up, I’m eager to see what’s going on here
1
i
Me too 🙂
z
btw the next release will be next Thurdsay, I’m sure I’ll have a PR up before then if you want to test beforehand
You may want to look into getting up to the latest release in the meantime
i
Noted, thanks! We’ll upgrade ASAP.
A quick follow-up @Michael - we just experienced the same error/failure with the same 3 deployments from the Sequential task runner flow that failed last night, so it does look like the issue is reproducible. We’re going to try staggering these runs and see if the problem persists.
z
Here’s a patch with better logs https://github.com/PrefectHQ/prefect/pull/8097
👍 1
i
Thank you! We’ll try to install this PR on the container running the flow and see what we catch.
z
Should be doable with the
EXTRA_PIP_PACKAGES
variable if you’re using our image entrypoint e.g.
Copy code
docker run --env EXTRA_PIP_PACKAGES="git+<https://github.com/PrefectHQ/prefect@task-run-abort-log>" prefecthq/prefect:2-latest prefect version
🙏 1
i
Hi @Michael - an update from our end. We’ve upgraded to 2.7.7 and have done some more testing. 1. The number of flows we have running in parallel at the time doesn’t seem to matter, we get the same crashing behavior when running just one instance of this flow. 2. Flows are virtually always crashing at almost exactly the 4 hour mark. 3. We installed the logging PR you created and here is the full output for the pending task:
Copy code
Task run '8775d5ff-1614-4df3-83bd-dc3b3fc4bb0f' received abort during orchestration: The enclosing flow must be running to begin task execution.. Task run is in PENDING state.
z
Ah interesting. This task run failure might be misleading here. If the flow run is marked as failed, we won’t let task runs move into a running state.
(as that abort reason suggests)
So right now we have a missing result error which is caused by a pending task run which is caused by an aborted transition which is caused by a flow run that is not in a running state anymore.
Does the flow run itself have any relevant logs?
a
we're also seeing similar weirdness in some of our flows, though we're seeing it in flows that only run for a couple seconds. all the tasks in our flow complete successfully, then we see the same errorr:
Copy code
refect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
we're on prefect 2.7.1. we use prefect cloud we don't have any retries setup, and we use whatever the default persistence is (the docs say that it should be enabled). The first time we saw this error was 2022-12-15 163948 UTC
we've only noticed in the last couple days because we have this set of jobs that run monthly that we have failure notifications set up for
its not impacting our data flows but our on call has to look at every failure to ensure that the job actually did what it was supposed to do, which is a bit cumbersome when you're running thousands of these flows a day
a
I just encountered this; here’s a minimal example with prefect-dask
Copy code
import dask.dataframe
import dask.distributed
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

client = dask.distributed.Client()

@task
def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
    df = dask.datasets.timeseries(start, end, partition_freq="4w")
    return df

@task
def process_data(df) -> dask.dataframe.DataFrame:
    df_yearly_avg = df.groupby(df.index.year).mean()
    return df_yearly_avg.compute()

@flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
def dask_flow():
    df = read_data.submit("1988", "2022")
    df_yearly_average = process_data.submit(df)
    return df_yearly_average

dask_flow()
in this case, it’s because the memory maxed out on a single worker wrapping with get_dask_client in my case allows it to run successfully
Copy code
@task
def process_data(df) -> dask.dataframe.DataFrame:
    with get_dask_client():
        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()
z
@Alix Cook are you using Dask as well? @Andrew Huang since we have reproductions without Dask, I’m hesitant to go down the Dask rabbit hole. Especially OOM on a worker is always going to cause some sort of ungraceful shutdown.
👍 1
a
no, our flows are running as local processes
i
Hi Michael - no additional logging on the flow level unfortunately. Id o notice the State Message attached to the flow is :
Flow run infrastructure exited with non-zero status code -1.
Since this is almost always crashing at the 4 hour mark, it makes me suspicious that there is some timeout we’re missing. However, our other very long-running flows are not timing out at any point. I should also note the other flow where we saw this intermittent behavior on crashed at an arbitrary time after 2 hours, so maybe not.
a
oh I missed the reproduction
z
@Ilya Galperin that sounds like the failure is being reported by the agent then the flow is crashing since the agent has marked it as failed.
You’re using Kubernetes jobs?
i
Yes
z
@Alix Cook you’re using process infrastructure or just running flows without an agent? These may be different issues.
a
we're running flows with an agent, the agent is just running them in a local process.
z
Great thanks! Your issue actually sounds distinct from Ilya’s. Can you open an issue with details like DEBUG level agent and flow run logs?
i
I’m going to try the crash again and see if I can’t find anything in the container’s logs that might not be being shipped back.
a
sure
i
Hi @Zanie - unfortunately no additional logs on the container or otherwise. Not sure if it’s relevant but the pod itself is not crashing, the container spins down and the pod stays alive with a succeeded state. Any other avenues you think might be worth digging into?
z
I’m very confident this is a bug in the Kubernetes library that we will need to add a workaround for (https://github.com/PrefectHQ/prefect/issues/8050#issuecomment-1379289312)
1
i
Yes this looks really really similar - I think you are probably right. Thanks for flagging this Michael, we will keep track of this issue!