https://prefect.io logo
Title
t

Tim Enders

10/25/2022, 2:53 PM
🤔 Using the default setup for result caching with Prefect 2.6.4 and I always get this exception at the end of a Completed Flow run...
Traceback (most recent call last):
  File "/home/tenders/Documents/code/prefect-orion/platform_prefect/accounts/accounts_subscriptions.py", line 359, in <module>
    flow_result = main()
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 439, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 229, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/prefect/states.py", line 86, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
It seems like Prefect is still expecting the State data to exist, even though it doesn't by default? (I may be misunderstanding things too)
1
z

Zanie

10/25/2022, 3:23 PM
What do you mean by default setup for result caching? 🙂 A small example would be helpful.
t

Tim Enders

10/25/2022, 3:56 PM
Well, I know it used to always persist (that is the word I was looking for) the results until 2.6.x. And that was actually a problem for me for a single task in 2.0 because I couldn't turn it off. I would get a Pickle Error on the client object in the task. The solution in 1.0 was to turn of the persistence for the single task and go about your business. With 2.6 I no longer get the PickleError, but I am seeing this exception come up like I need to have result persistence turned on for the Flow or something? Honestly I think I might just need to read the release notes again...
z

Zanie

10/25/2022, 4:14 PM
We will turn on persistence when you use a feature that requires it or you can opt-in explicitly.
The documentation on results is pretty thorough: https://docs.prefect.io/concepts/results/#persisting-results
You shouldn’t get this exception in normal operation, are you returning a state manually?
t

Tim Enders

10/25/2022, 4:16 PM
maybe?
try:
        bq_result = load_df_bq(bqclient, transformed_data)
    except Exception as e:
        slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
        slack_webhook_block.notify(f"BQ Loading failure on {flow_name} on {branch_name}")
        return Failed(message="Load Failure")
    else:
        if isinstance(bq_result, LoadJob) and bq_result.state == "DONE":
            return Completed(message="Load Finished!")
        elif isinstance(bq_result, LoadJob) and bq_result.state != "DONE":
            slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
            slack_webhook_block.notify(f"BQ Loading failure on {flow_name} on {branch_name}")
            return bq_result
        else:
            slack_webhook_block = SlackWebhook.load("data-pipeline-notifications")
            slack_webhook_block.notify(f"General Load Failure on {flow_name} on {branch_name}")
            return Failed(message="Load Failure")
z

Zanie

10/25/2022, 4:20 PM
I’d presume
return Completed(message="Load Finished!")
is the issue
If you add
data=<put some data on the state>
there you should not get this error
t

Tim Enders

10/25/2022, 8:55 PM
Thank you! That fix looks great!