https://prefect.io logo
Title
m

merlin

11/10/2022, 4:46 AM
Returning Manual State from a Task I want to run a scheduled flow that deals with an intermittent service, and I want to handle the exceptions gracefully. The common idiom seems to be raising
ValueError
to trigger a failure, and thats how to manage the state of subsequent tasks or flows. Here's the thing: I don't want to see a stack trace unless it is some new failure that I need to troubleshoot. For the expected exceptions I want to post an error the log and proceed with my flow according to logic I design. I'm having trouble returning a manual state of Cancelled, concrete example and logs posted in thread. Can anyone guide me toward causing the entire flow to be cancelled without raising an exception?
1
This task will test if a network connection is available, and is meant to halt the flow until the task gives up after X attempts. I return a manual state of
Cancelled(message = "No connection available")
. Unfortunately in the logs you can see a ValueError is raised by the
get_state_exception
call. What I want is for the whole flow to be cancelled.
import requests
from requests.exceptions import ConnectionError
from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Cancelled

@task
def wait_for_service_X():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Try connect to service...")
    for i in range(1,3600):
        try:                
            if requests.get(SERVICE_URL).ok:
                return "Service Connected"
        except ConnectionError as ex:
            latest_exception = ex

    logger.error(f"{latest_exception}")
    return Cancelled(message=f"Service connection failed after {i} attempts")

@task
def the_rest_of_the_tasks():
    return "This task represents all next tasks that shouldn't run."

@flow
def test_flow():
    state = wait_for_service_X(return_state=True)
    print(f"result: {state}")

    the_rest_of_the_tasks(wait_for=[state])

test_flow()
And the resulting logs:
poetry run python src/tasks/wait_service.py

20:34:51.946 | INFO    | prefect.engine - Created flow run 'cocky-gibbon' for flow 'test-flow'
20:34:52.009 | INFO    | Flow run 'cocky-gibbon' - Created task run 'wait_for_service_X-828f1dbb-0' for task 'wait_for_service_X'
20:34:52.009 | INFO    | Flow run 'cocky-gibbon' - Executing 'wait_for_service_X-828f1dbb-0' immediately...
20:34:52.029 | INFO    | Task run 'wait_for_service_X-828f1dbb-0' - Try connect to trino server...
20:34:58.430 | INFO    | Task run 'wait_for_service_X-828f1dbb-0' - Service connection failed after 2 attempts.
20:34:58.432 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - HTTPSConnectionPool(host='[<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>)', port=443): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x105b9b430>, 'Connection to [<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>) timed out. (connect timeout=1)'))
20:34:58.432 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - {'response': None, 'request': <PreparedRequest [GET]>}
20:34:58.463 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - Finished in state Cancelled('Service connection failed after 2 attempts')
result: Cancelled('Service connection failed after 2 attempts')
20:34:58.484 | INFO    | Flow run 'cocky-gibbon' - Created task run 'the_rest_of_the_tasks-daa0f3b5-0' for task 'the_rest_of_the_tasks'
20:34:58.484 | INFO    | Flow run 'cocky-gibbon' - Executing 'the_rest_of_the_tasks-daa0f3b5-0' immediately...
20:34:58.510 | INFO    | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - This task represents all next tasks that shouldn't run.
20:34:58.521 | INFO    | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - Finished in state Completed()
20:34:58.534 | ERROR   | Flow run 'cocky-gibbon' - Finished in state Failed('0/2 states are not final.')
Traceback (most recent call last):
  File "HOME/zillpo/bssgeb/src/tasks/wait_service.py", line 39, in <module>
    test_flow()
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 318, in get_state_exception
    raise ValueError(
ValueError: Failed state result was an iterable of states but none were failed.
k

Khuyen Tran

11/10/2022, 4:17 PM
It seems like what you need is a task-level timeout. Is that correct?
🙏 1
m

Mason Menges

11/10/2022, 4:30 PM
Hey @merlin It looks Cancelled isn't being treated as a terminal state in your example which sounds like a bug, would you be will to open an issue here https://github.com/PrefectHQ/prefect/issues and include your example, for what it's worth running the same code returning a Completed/Failed/Crashed state behaves as I'd expect it too.
m

merlin

11/10/2022, 5:55 PM
I will open an issue, thanks for the reply. It seems to be expected behavior though? According to the aggregation rules for states, here in the documentation below. IMO those rules about combinations of states deserve more attention and probably a way for users to set a simple set of rules. For example, if a task get Cancelled, and it is a dependency, then the Cancelled state should roll up the chain and Cancel the whole flow, depending on the remaining dependencies. States, in doc for get_state_exception
If not given a FAILED or CRASHED state, this raise a value error.

If the state result is a state, its exception will be returned.

If the state result is an iterable of states, the exception of the first failure will be returned.

If the state result is a string, a wrapper exception will be returned with the string as the message.

If the state result is null, a wrapper exception will be returned with the state message attached.

If the state result is not of a known type, a TypeError will be returned.

When a wrapper exception is returned, the type will be FailedRun if the state type is FAILED or a CrashedRun if the state type is CRASHED.
Yeah the more I think about it this rule about raising an error if not FAILED/CRASHED needs a look. At a minimum, CANCELLED should be treated the same.
Khuyen: It seems like what you need is a task-level timeout. Is that correct?
Actually, the task is going to retry a bunch of times at 5 min intervals. Now that I know passing
Cancelled
back to the flow wont help, I'm returning
True/False
which will guide the logical process in the calling flow.
@flow
def test_flow():
    if VPN_connected():
        the_rest_of_the_tasks()
    else:
        return Cancelled(message="VPN connection failed, cancelling flow")
So this works great, the flow gets cancelled and shows up in the UI as a grey flow run to signify an expected result calling a flaky service. The next day's job run will catch everything up so there is no need to troubleshoot a failure.
@Mason Menges There is an open issue with the same thing: https://github.com/PrefectHQ/prefect/issues/6789 returning Cancelled from within a task raises TypeError
z

Zanie

11/11/2022, 1:24 AM
We will tackle this when we add first-class cancellation in the next month. These rules do not handle cancellation right now because we do not have a cancellation mechanic yet. You can use a named failed state for now if you would like e.g.
Failed(name="Skipped", message="This shouldn't run for ___ reason")
:gratitude-thank-you: 3
m

merlin

11/11/2022, 1:28 AM
Wild, ok this is great. BTW i love using Prefect.
:gratitude-thank-you: 2
🙌 3
z

Zanie

11/11/2022, 1:39 AM
You can even name it “Cancelled” if you want 😉 the state type is the primary determinant for orchestration behavior and names can be used to extend states to display information relevant to your organization.
Thanks! We love hearing that 🙂