merlin
11/10/2022, 4:46 AMValueError
to trigger a failure, and thats how to manage the state of subsequent tasks or flows.
Here's the thing: I don't want to see a stack trace unless it is some new failure that I need to troubleshoot. For the expected exceptions I want to post an error the log and proceed with my flow according to logic I design.
I'm having trouble returning a manual state of Cancelled, concrete example and logs posted in thread. Can anyone guide me toward causing the entire flow to be cancelled without raising an exception?Cancelled(message = "No connection available")
.
Unfortunately in the logs you can see a ValueError is raised by the get_state_exception
call. What I want is for the whole flow to be cancelled.import requests
from requests.exceptions import ConnectionError
from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Cancelled
@task
def wait_for_service_X():
logger = get_run_logger()
<http://logger.info|logger.info>("Try connect to service...")
for i in range(1,3600):
try:
if requests.get(SERVICE_URL).ok:
return "Service Connected"
except ConnectionError as ex:
latest_exception = ex
logger.error(f"{latest_exception}")
return Cancelled(message=f"Service connection failed after {i} attempts")
@task
def the_rest_of_the_tasks():
return "This task represents all next tasks that shouldn't run."
@flow
def test_flow():
state = wait_for_service_X(return_state=True)
print(f"result: {state}")
the_rest_of_the_tasks(wait_for=[state])
test_flow()
poetry run python src/tasks/wait_service.py
20:34:51.946 | INFO | prefect.engine - Created flow run 'cocky-gibbon' for flow 'test-flow'
20:34:52.009 | INFO | Flow run 'cocky-gibbon' - Created task run 'wait_for_service_X-828f1dbb-0' for task 'wait_for_service_X'
20:34:52.009 | INFO | Flow run 'cocky-gibbon' - Executing 'wait_for_service_X-828f1dbb-0' immediately...
20:34:52.029 | INFO | Task run 'wait_for_service_X-828f1dbb-0' - Try connect to trino server...
20:34:58.430 | INFO | Task run 'wait_for_service_X-828f1dbb-0' - Service connection failed after 2 attempts.
20:34:58.432 | ERROR | Task run 'wait_for_service_X-828f1dbb-0' - HTTPSConnectionPool(host='[<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>)', port=443): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x105b9b430>, 'Connection to [<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>) timed out. (connect timeout=1)'))
20:34:58.432 | ERROR | Task run 'wait_for_service_X-828f1dbb-0' - {'response': None, 'request': <PreparedRequest [GET]>}
20:34:58.463 | ERROR | Task run 'wait_for_service_X-828f1dbb-0' - Finished in state Cancelled('Service connection failed after 2 attempts')
result: Cancelled('Service connection failed after 2 attempts')
20:34:58.484 | INFO | Flow run 'cocky-gibbon' - Created task run 'the_rest_of_the_tasks-daa0f3b5-0' for task 'the_rest_of_the_tasks'
20:34:58.484 | INFO | Flow run 'cocky-gibbon' - Executing 'the_rest_of_the_tasks-daa0f3b5-0' immediately...
20:34:58.510 | INFO | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - This task represents all next tasks that shouldn't run.
20:34:58.521 | INFO | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - Finished in state Completed()
20:34:58.534 | ERROR | Flow run 'cocky-gibbon' - Finished in state Failed('0/2 states are not final.')
Traceback (most recent call last):
File "HOME/zillpo/bssgeb/src/tasks/wait_service.py", line 39, in <module>
test_flow()
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 442, in __call__
return enter_flow_run_engine_from_flow_call(
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
return await state.result(fetch=True)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
raise await get_state_exception(state)
File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 318, in get_state_exception
raise ValueError(
ValueError: Failed state result was an iterable of states but none were failed.
Khuyen Tran
11/10/2022, 4:17 PMMason Menges
11/10/2022, 4:30 PMmerlin
11/10/2022, 5:55 PMIf not given a FAILED or CRASHED state, this raise a value error.
If the state result is a state, its exception will be returned.
If the state result is an iterable of states, the exception of the first failure will be returned.
If the state result is a string, a wrapper exception will be returned with the string as the message.
If the state result is null, a wrapper exception will be returned with the state message attached.
If the state result is not of a known type, a TypeError will be returned.
When a wrapper exception is returned, the type will be FailedRun if the state type is FAILED or a CrashedRun if the state type is CRASHED.
Khuyen: It seems like what you need is a task-level timeout. Is that correct?Actually, the task is going to retry a bunch of times at 5 min intervals. Now that I know passing
Cancelled
back to the flow wont help, I'm returning True/False
which will guide the logical process in the calling flow.
@flow
def test_flow():
if VPN_connected():
the_rest_of_the_tasks()
else:
return Cancelled(message="VPN connection failed, cancelling flow")
So this works great, the flow gets cancelled and shows up in the UI as a grey flow run to signify an expected result calling a flaky service. The next day's job run will catch everything up so there is no need to troubleshoot a failure.Zanie
11/11/2022, 1:24 AMFailed(name="Skipped", message="This shouldn't run for ___ reason")
merlin
11/11/2022, 1:28 AMZanie
11/11/2022, 1:39 AM