We are running into an issue with flow runs hangin...
# prefect-community
s
We are running into an issue with flow runs hanging after a task fails -- a task directly downstream from the failure ends up stuck in a Pending state even though all its upstream tasks are complete. We're using an
ECSRun
with a
LocalDaskExecutor
and I'm unable to find any way to debug or diagnose why the task isn't ever executed.
k
Can you try using processes instead of threads in the LocalDaskExecutor to see if that helps?
s
Yes, we are using processes
k
Is the flow still in a running state?
s
Yes, the flow continues running but no tasks are
k
Let me check with some other team members for ideas
s
Thanks!
Note that this isn't 100% reliable, the flow will sometimes correctly fail, but I'd estimate that >80% of the time we see this issue.
k
Ok
This is normally more stable with processes already. Could you show me how you attached for executor? Just
Copy code
flow.executor = LocalDaskExecutor()
?
s
Copy code
with Flow(
    "encyclopedia-flow",
    result=PrefectResult(),
    storage=STORAGE,
    run_config=RUN_CONFIG,
    executor=LocalDaskExecutor(scheduler="processes"),
) as flow:
k
You can try DaskExecutor and that will do LocalCluster instead of multiprocessing pool and that might be more stable
s
Thanks, we'll take a look.
Is there any way to increase the logging of the flow container to debug the task scheduling issue?
k
Using the ECSRun config, add
PREFECT___LOGGING___LEVEL:"DEBUG"
as an environment variable
s
Great, thanks!
@Kevin Kho I've found a workaround for this issue, which I think is the same as in this earlier thread from @Theo Platt: https://prefect-community.slack.com/archives/CL09KU1K7/p1641313339334100
k
Ohh I see
s
I don't have a completely simplified reproduction, but the issue was related to a task raising an unpickleable
WaiterError
(from
boto3
). In
AWSClientWait
the error is handled as
raise FAIL(...) from e
This may only lead to an error in the context we're running it (by calling
AWSClientWait.run()
from within another task)
But it appears that the attempt to unpickle the exception causes the ECS task to hang without exiting or updating the flow state
🙌 1
The behavior is not exactly the same when reproducing locally, but I see the following stack trace:
Copy code
└── 15:47:21 | ERROR   | Unexpected error: TypeError("__init__() missing 1 required positional argument: 'last_response'")
Traceback (most recent call last):
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/prefect/executors/dask.py", line 685, in wait
    return dask.compute(
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/dask/multiprocessing.py", line 219, in get
    result = get_async(
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/dask/local.py", line 508, in get_async
    res, worker_id = loads(res_info)
  File "/home/sethjust/miniconda3/envs/seerbio-venv-py38/lib/python3.8/site-packages/botocore/exceptions.py", line 28, in _exception_from_packed_args
    return exception_cls(*args, **kwargs)
TypeError: __init__() missing 1 required positional argument: 'last_response'
I believe that this is a bug in Prefect, possibly related to https://github.com/PrefectHQ/prefect/issues/2178