<@ULVA73B9P> - Server runs a single `async` flow ...
# ask-marvin
j
@Marvin • Server runs a single
async
flow when a user requests to run. • User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Copy code
Crash details:
Traceback (most recent call last):
  File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult
    return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes
    yield
  File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
  File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 328, in aresult
    raise CancelledError() from exc
prefect._internal.concurrency.cancellation.CancelledError
Server:
Copy code
async def eval_request(
    # ....
    result_resource = asyncio.create_task(
        prefect_eval_resource(
            task,
            do_eval_flops_params,
            do_eval_latency_on_cpu,
            do_eval_latency_on_gpu,
        )
    )
    if result_resource:
        result_resource = await result_resource
Flow:
Copy code
@flow(log_prints=True)
async def prefect_eval_resource(
    ...
) -> dict:
        gpu_latency_response = run_deployment(
            ...
            as_subflow=True,
            timeout=None,
        )

        cpu_latency_response = run_deployment(
            ...
            as_subflow=True,
            timeout=None,
        )

        # ...
        response = await gpu_latency_response
        latency_gpu = await response.state.result().get()
        # ...

        # ...
        response = await cpu_latency_response
        latency_cpu = await response.state.result().get()
        # ...

        return {
            'cpu': latency_cpu,
            'gpu': latency_gpu,
        }
m
Your message was too long by 646 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run. ```Crash details: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult return await asyncio.wrap_future(self.future) asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes yield File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run terminal_or_paused_state = await orchestrate_flow_run( File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run result = await flow_call.aresult()
Your message was too long by 219 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Copy code
asyncio.exceptions.CancelledError

# ...
The above exception was the direct cause of the following exception:

# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
Copy code
async def eval_request(
    # ....
    result_resource = asyncio.create_task(
        prefect_eval_resource(
            task,
            do_eval_flops_params,
            do_eval_latency_on_cpu,
            do_eval_latency_on_gpu,
        )
    )
    if result_resource:
        result_resource = await result_resource
Flow: ```@flow(log_prints=True) async def prefect_eval_resource( do_eval_flops_params: bool, do_eval_latency_on_cpu: bool, do_eval_latency_on_gpu: bool, ... ) -> dict: if do_eval_flops_params: flops_params_response = run_deployment( ... as_subflow=True, timeout=None, ) if do_eval_latency_on_gpu: gpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) if do_eval_latency_on_cpu: cpu_latency_response = run_deployment( ... as_subflow=True, timeout
Your message was too long by 113 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Copy code
asyncio.exceptions.CancelledError

# ...
The above exception was the direct cause of the following exception:

# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
Copy code
async def eval_request(
    # ....
    result_resource = asyncio.create_task(
        prefect_eval_resource(
            task,
            do_eval_flops_params,
            do_eval_latency_on_cpu,
            do_eval_latency_on_gpu,
        )
    )
    if result_resource:
        result_resource = await result_resource
Flow: ```@flow(log_prints=True) async def prefect_eval_resource( do_eval_latency_on_cpu: bool, do_eval_latency_on_gpu: bool, ... ) -> dict: if do_eval_latency_on_gpu: gpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) if do_eval_latency_on_cpu: cpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) latency_gpu = -1 # Dummy if do_eval_latency_on_gpu: try: response = await gpu_latency_response print('') latency_gpu = await
Your message was too long by 72 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Copy code
asyncio.exceptions.CancelledError

# ...
The above exception was the direct cause of the following exception:

# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
Copy code
async def eval_request(
    # ....
    result_resource = asyncio.create_task(
        prefect_eval_resource(
            task,
            do_eval_flops_params,
            do_eval_latency_on_cpu,
            do_eval_latency_on_gpu,
        )
    )
    if result_resource:
        result_resource = await result_resource
Flow: ```@flow(log_prints=True) async def prefect_eval_resource( do_eval_latency_on_cpu: bool, do_eval_latency_on_gpu: bool, ... ) -> dict: if do_eval_latency_on_gpu: gpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) if do_eval_latency_on_cpu: cpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) # ... if do_eval_latency_on_gpu: try: response = await gpu_latency_response print('') latency_gpu = await response.state.result().get()
Your message was too long by 46 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run.
Copy code
asyncio.exceptions.CancelledError

# ...
The above exception was the direct cause of the following exception:

# ...
prefect._internal.concurrency.cancellation.CancelledError
Server:
Copy code
async def eval_request(
    # ....
    result_resource = asyncio.create_task(
        prefect_eval_resource(
            task,
            do_eval_flops_params,
            do_eval_latency_on_cpu,
            do_eval_latency_on_gpu,
        )
    )
    if result_resource:
        result_resource = await result_resource
Flow: ```@flow(log_prints=True) async def prefect_eval_resource( do_eval_latency_on_cpu: bool, do_eval_latency_on_gpu: bool, ... ) -> dict: if do_eval_latency_on_gpu: gpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) if do_eval_latency_on_cpu: cpu_latency_response = run_deployment( ... as_subflow=True, timeout=None, ) # ... if do_eval_latency_on_gpu: response = await gpu_latency_response latency_gpu = await response.state.result().get() # ... # ... if do_eval
It seems you're experiencing concurrency issues with flows leading to
CancelledError
. It could be related to orchestration behavior when dealing with subflows and concurrent execution. Here are some pointers based on the Prefect documentation and GitHub issues: 1. Global Concurrency Limits: Make sure your server is configured to not exceed the maximum number of concurrent flows that it can handle. Details on concurrency limits can be found in the Prefect documentation. 2. Exception Handling: The crashes might be due to issues in exception handling or task outcomes not being awaited correctly. Ensure tasks and subflows are orchestrated correctly and that exceptions are handled to prevent crashes from affecting other flow runs. 3. GitHub Issues: There are reports of similar
CancelledError
issues in Prefect, which could be related to task runners and their configuration, or potential bugs in Prefect's concurrency management. Here are the related GitHub issues you can explore and check for any resolutions or workarounds: - Occasional http2 connection errors (KeyError) - Flow state is Completed, but tasks are crashed - Nested blocks lead to TimeoutError when loaded in task Be sure to check for any insights or solutions provided in these issues, as they might be directly applicable to your scenario. Additionally, consider checking your system for any external factors that might be causing the cancellation, such as resource limitations or network issues. If none of these solutions work, you may need to create a new issue in the Prefect GitHub repository to seek help from the maintainers.
Your message was too long by 272 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
• Server runs a single
async
flow when a user requests to run.
• User requests can be accepted simultaneously, so the flow can be executed concurrently. • Problem: Flow run that executed by another user is getting crashed when the server finished a previous flow run. ```Crash details: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 326, in aresult return await asyncio.wrap_future(self.future) asyncio.exceptions.CancelledError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 2295, in report_flow_run_crashes yield File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 553, in begin_flow_run terminal_or_paused_state = await orchestrate_flow_run( File "/home/janghoo/dev/venvs/evaluable/prefect-server/lib/python3.8/site-packages/prefect/engine.py", line 863, in orchestrate_flow_run result = await flow_call.aresult()