https://prefect.io logo
Title
t

Tim Galvin

11/16/2022, 11:59 AM
Hi all -- I have a pipeline flow that is working perfectly fine on one cluster, but when running it on another I am seeing these errors thrown consistently. There are no errors related to the actual work, and I am executing this workflow against a known dataset that is weel understood, so I am sure the problem is not there. Any ideas? It is almost like a task ID can not be pulled from the database or something. I am running a postgres data and orion server in a VM near said clusters.
Encountered exception during execution:
Traceback (most recent call last):
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/bin/process_holography.py", line 431, in holography_flow
    return_me = log_and_wait_futures(return_me)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/states.py", line 86, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
1
m

Mathijs Carlu

11/16/2022, 12:25 PM
I came across this error after an earlier task failed with the error I posted in this thread. Because the earlier task can't report on its state, the result is missing in a task that needs it later. If you're experiencing the same, you can track the issue (also in thread). Otherwise, do you use result persistence on your task runs? And if so, where are you storing them?
t

Tim Galvin

11/16/2022, 12:46 PM
Interesting. So, I am mapping a set of values to a single task in a for loop, similar to your example.
return_me = [
            task_grid_holography.submit(
                beam, sbid, workdir, grid_ant, return_me=return_me
            )
            for beam, ms in enumerate(mslist_holo)
        ]
In the above
return_me
is a dummy variable with no meaning that I am only passing around so that prefect can track dependencies. I have recently learnt of the
wait_for
argument... But, I am unclear if it is the same issue. I am not seeing any earlier tasks failing. It all looks exactly like I expect. But perhaps if I up the logging level someone might stick out. Just so I understand - the thinking from your thread is that task C relies on task B. Task B fails and does no update the orion service, so when Task C attempts and assesses its dependencies, it is not able to find a state for Task B. Bam boom we have an error. Is that correct? When you say result persistence on task runs, you mean whether I am caching the results from tasks that have previously been called with the same sets of inputs? If so, I am not knowingly using this. I could delete my prefect home and dask work space just to be sure though.
The other thought I just had was a concurrency limit at the database. I have 20 odd tasks currning concurrently in this loop. I am curious what happens if they all hit the database at about the same time - at the same time as the previous task that they all rely upon.
m

Mathijs Carlu

11/16/2022, 1:03 PM
Your understanding is correct. Upping the log level would not help (i'm assuming you always log error level and above). The task that fails to report its state always errors. By result persistence, I don't mean caching (which is what you're talking about). By default, task results are saved in memory. However, when the kubernetes job is running out of memory it might cause problems. Could you try setting the cache_result_in_memory to false and the persist_result to true in your '@task()' decorator?
t

Tim Galvin

11/16/2022, 1:10 PM
Thanks for that. Let me try out your second suggestion. I am reasonably certain no tasks are failing.
I am testing this setup out on a new SLURM cluster, so I could very well have botched the
dask_jobqueue.SLURMCluster
specification
More strangness. I dropped back the number on activate dask workers and nodes to smaller numbers. The error transformed into:
07:23:30.229 | ERROR   | Flow run 'natural-dogfish' - Finished in state Failed('Flow run encountered an exception. ValueError: Failed state result was an iterable of states but none were failed.\n')
Traceback (most recent call last):
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/bin/process_holography.py", line 530, in <module>
    sys.exit(cli())
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/bin/process_holography.py", line 526, in cli
    main(args.workdir, args.sbid, grid_ant=args.ant, cluster=args.cluster)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/bin/process_holography.py", line 461, in main
    holography_flow()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 161, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 520, in create_and_begin_subflow_run
    return await terminal_state.result(fetch=True)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/states.py", line 318, in get_state_exception
    raise ValueError(
ValueError: Failed state result was an iterable of states but none were failed.
Everything is running and looks to be doing the right thing. I am seeing in my slurm logs warnings from the dask nanny about using more than 80% of memory. I will have to take a closer look at these slrum logs to make more sense of it.
m

Mathijs Carlu

11/16/2022, 1:37 PM
That's weird indeed, can't give you any pointers on that one. Actually, may I ask why you are using submit in a list comprehension, rather than .map() ?
t

Tim Galvin

11/16/2022, 1:47 PM
As I understand the
.map()
, the function being mapped needs to have that
async def
declaration. That is basically it. I am that petty. 🙂
m

Mathijs Carlu

11/16/2022, 2:04 PM
I don't think it has to, though? The most basic example in the docs seems to say it can just be a sync function...
:gratitude-thank-you: 1
🙌 1
t

Tim Galvin

11/16/2022, 2:14 PM
Well, look at that. You are right: https://docs.prefect.io/concepts/tasks/?h=map#map Where ever did I get that loced into my head. I swear I was reading it somewhere.
Now I look silly. Even sillier, I seemingly have it working. I made no changes to the actual pipeline code. The only thing I changed was my
dask_jobqueue.SLURMCluster
resource requirements.
🚀 2
2