Janghoo Lee
02/28/2024, 8:54 AMasync
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently.
• Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Crash details:
Traceback (most recent call last):
File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes
yield
File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run
terminal_or_paused_state = await orchestrate_flow_run(
File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run
result = await flow_call.aresult()
File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 328, in aresult
raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
Server:
async def eval_request(
# ....
result_resource = asyncio.create_task(
prefect_eval_resource(
task,
do_eval_flops_params,
do_eval_latency_on_cpu,
do_eval_latency_on_gpu,
)
)
if result_resource:
result_resource = await result_resource
Flow:
@flow(log_prints=True)
async def prefect_eval_resource(
...
) -> dict:
gpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
cpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
# ...
response = await gpu_latency_response
latency_gpu = await response.state.result().get()
# ...
# ...
response = await cpu_latency_response
latency_cpu = await response.state.result().get()
# ...
return {
'cpu': latency_cpu,
'gpu': latency_gpu,
}
Marvin
02/28/2024, 8:54 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run. ```Crash details: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult return await asyncio.wrap_future(self.future) asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes yield File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run terminal_or_paused_state = await orchestrate_flow_run( File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run result = await flow_call.aresult()flow when a user requests to run.async
Marvin
02/28/2024, 8:57 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.flow when a user requests to run.async
asyncio.exceptions.CancelledError
# ...
The above exception was the direct cause of the following exception:
# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
async def eval_request(
# ....
result_resource = asyncio.create_task(
prefect_eval_resource(
task,
do_eval_flops_params,
do_eval_latency_on_cpu,
do_eval_latency_on_gpu,
)
)
if result_resource:
result_resource = await result_resource
Flow:
```@flow(log_prints=True)
async def prefect_eval_resource(
do_eval_flops_params: bool,
do_eval_latency_on_cpu: bool,
do_eval_latency_on_gpu: bool,
...
) -> dict:
if do_eval_flops_params:
flops_params_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
if do_eval_latency_on_gpu:
gpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
if do_eval_latency_on_cpu:
cpu_latency_response = run_deployment(
...
as_subflow=True,
timeoutMarvin
02/28/2024, 8:58 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.flow when a user requests to run.async
asyncio.exceptions.CancelledError
# ...
The above exception was the direct cause of the following exception:
# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
async def eval_request(
# ....
result_resource = asyncio.create_task(
prefect_eval_resource(
task,
do_eval_flops_params,
do_eval_latency_on_cpu,
do_eval_latency_on_gpu,
)
)
if result_resource:
result_resource = await result_resource
Flow:
```@flow(log_prints=True)
async def prefect_eval_resource(
do_eval_latency_on_cpu: bool,
do_eval_latency_on_gpu: bool,
...
) -> dict:
if do_eval_latency_on_gpu:
gpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
if do_eval_latency_on_cpu:
cpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
latency_gpu = -1 # Dummy
if do_eval_latency_on_gpu:
try:
response = await gpu_latency_response
print('✅')
latency_gpu = awaitMarvin
02/28/2024, 8:59 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.flow when a user requests to run.async
asyncio.exceptions.CancelledError
# ...
The above exception was the direct cause of the following exception:
# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
async def eval_request(
# ....
result_resource = asyncio.create_task(
prefect_eval_resource(
task,
do_eval_flops_params,
do_eval_latency_on_cpu,
do_eval_latency_on_gpu,
)
)
if result_resource:
result_resource = await result_resource
Flow:
```@flow(log_prints=True)
async def prefect_eval_resource(
do_eval_latency_on_cpu: bool,
do_eval_latency_on_gpu: bool,
...
) -> dict:
if do_eval_latency_on_gpu:
gpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
if do_eval_latency_on_cpu:
cpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
# ...
if do_eval_latency_on_gpu:
try:
response = await gpu_latency_response
print('✅')
latency_gpu = await response.state.result().get()Marvin
02/28/2024, 9:00 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.flow when a user requests to run.async
asyncio.exceptions.CancelledError
# ...
The above exception was the direct cause of the following exception:
# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
async def eval_request(
# ....
result_resource = asyncio.create_task(
prefect_eval_resource(
task,
do_eval_flops_params,
do_eval_latency_on_cpu,
do_eval_latency_on_gpu,
)
)
if result_resource:
result_resource = await result_resource
Flow:
```@flow(log_prints=True)
async def prefect_eval_resource(
do_eval_latency_on_cpu: bool,
do_eval_latency_on_gpu: bool,
...
) -> dict:
if do_eval_latency_on_gpu:
gpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
if do_eval_latency_on_cpu:
cpu_latency_response = run_deployment(
...
as_subflow=True,
timeout=None,
)
# ...
if do_eval_latency_on_gpu:
response = await gpu_latency_response
latency_gpu = await response.state.result().get()
# ...
# ...
if do_evalMarvin
02/28/2024, 9:01 AMCancelledError
. It could be related to orchestration behavior when dealing with subflows and concurrent execution. Here are some pointers based on the Prefect documentation and GitHub issues:
1. Global Concurrency Limits: Make sure your server is configured to not exceed the maximum number of concurrent flows that it can handle. Details on concurrency limits can be found in the Prefect documentation.
2. Exception Handling: The crashes might be due to issues in exception handling or task outcomes not being awaited correctly. Ensure tasks and subflows are orchestrated correctly and that exceptions are handled to prevent crashes from affecting other flow runs.
3. GitHub Issues: There are reports of similar CancelledError
issues in Prefect, which could be related to task runners and their configuration, or potential bugs in Prefect's concurrency management. Here are the related GitHub issues you can explore and check for any resolutions or workarounds:
- Occasional http2 connection errors (KeyError)
- Flow state is Completed, but tasks are crashed
- Nested blocks lead to TimeoutError when loaded in task
Be sure to check for any insights or solutions provided in these issues, as they might be directly applicable to your scenario. Additionally, consider checking your system for any external factors that might be causing the cancellation, such as resource limitations or network issues. If none of these solutions work, you may need to create a new issue in the Prefect GitHub repository to seek help from the maintainers.Marvin
02/28/2024, 9:03 AM• Server runs a single• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run. ```Crash details: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult return await asyncio.wrap_future(self.future) asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes yield File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run terminal_or_paused_state = await orchestrate_flow_run( File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run result = await flow_call.aresult()flow when a user requests to run.async