https://prefect.io logo
Title
a

Aleksandr Liadov

02/09/2023, 10:21 AM
Hello guys , I’m blocked with the problem MissingResult: State data is missing (DaskTaskRunner). When I try to run multiple flows (~30) in the same moment, some flows are failed with this error. It should not be the problem of Dask cluster, because on the same cluster I can run ~50 flows with prefect1 run without any problem! The problem is, that it’s a random error the flow which had this problem previous run, won’t have it on the next one. I add a stack trace in threads I see on GitHub that problem exists is more than one year: MissingResult State data is missing. · Issue #67 · PrefectHQ/prefect-dask · GitHub Any suggestions? I’m on the last prefect 2.7.12
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/app/flows_2/sub_flows/features_execution.py", line 263, in features_execution_flow
    features_results = [state.result() for state in states_features]
  File "/app/flows_2/sub_flows/features_execution.py", line 263, in <listcomp>
    features_results = [state.result() for state in states_features]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 88, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 269, in wait
    return await future.result(timeout=timeout)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/distributed/client.py", line 289, in _result
    raise exc.with_traceback(tb)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1307, in begin_task_run
    state = await orchestrate_task_run(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1382, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1672, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
    result = visit_fn(expr)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1670, in resolve_input
    return state.result(raise_on_failure=False, fetch=True) if return_data else None
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 100, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
With debug level, I see a warning msg just before this crash
Task run '32dc1400-a878-4691-8ddf-ca2efd07bd4a' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.