merlin
11/10/2022, 4:46 AMValueErrormerlin
11/10/2022, 4:49 AMCancelled(message = "No connection available")get_state_exceptionmerlin
11/10/2022, 4:54 AMimport requests
from requests.exceptions import ConnectionError
from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Cancelled
@task
def wait_for_service_X():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Try connect to service...")
    for i in range(1,3600):
        try:                
            if requests.get(SERVICE_URL).ok:
                return "Service Connected"
        except ConnectionError as ex:
            latest_exception = ex
    logger.error(f"{latest_exception}")
    return Cancelled(message=f"Service connection failed after {i} attempts")
@task
def the_rest_of_the_tasks():
    return "This task represents all next tasks that shouldn't run."
@flow
def test_flow():
    state = wait_for_service_X(return_state=True)
    print(f"result: {state}")
    the_rest_of_the_tasks(wait_for=[state])
test_flow()merlin
11/10/2022, 4:57 AMpoetry run python src/tasks/wait_service.py
20:34:51.946 | INFO    | prefect.engine - Created flow run 'cocky-gibbon' for flow 'test-flow'
20:34:52.009 | INFO    | Flow run 'cocky-gibbon' - Created task run 'wait_for_service_X-828f1dbb-0' for task 'wait_for_service_X'
20:34:52.009 | INFO    | Flow run 'cocky-gibbon' - Executing 'wait_for_service_X-828f1dbb-0' immediately...
20:34:52.029 | INFO    | Task run 'wait_for_service_X-828f1dbb-0' - Try connect to trino server...
20:34:58.430 | INFO    | Task run 'wait_for_service_X-828f1dbb-0' - Service connection failed after 2 attempts.
20:34:58.432 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - HTTPSConnectionPool(host='[<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>)', port=443): Max retries exceeded with url: / (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x105b9b430>, 'Connection to [<http://trino-adhoc.corp.zacs-prod.zg-int.net|trino-adhoc.corp.zacs-prod.zg-int.net>](<http://trino-adhoc.corp.zacs-prod.zg-int.net>) timed out. (connect timeout=1)'))
20:34:58.432 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - {'response': None, 'request': <PreparedRequest [GET]>}
20:34:58.463 | ERROR   | Task run 'wait_for_service_X-828f1dbb-0' - Finished in state Cancelled('Service connection failed after 2 attempts')
result: Cancelled('Service connection failed after 2 attempts')
20:34:58.484 | INFO    | Flow run 'cocky-gibbon' - Created task run 'the_rest_of_the_tasks-daa0f3b5-0' for task 'the_rest_of_the_tasks'
20:34:58.484 | INFO    | Flow run 'cocky-gibbon' - Executing 'the_rest_of_the_tasks-daa0f3b5-0' immediately...
20:34:58.510 | INFO    | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - This task represents all next tasks that shouldn't run.
20:34:58.521 | INFO    | Task run 'the_rest_of_the_tasks-daa0f3b5-0' - Finished in state Completed()
20:34:58.534 | ERROR   | Flow run 'cocky-gibbon' - Finished in state Failed('0/2 states are not final.')
Traceback (most recent call last):
  File "HOME/zillpo/bssgeb/src/tasks/wait_service.py", line 39, in <module>
    test_flow()
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 442, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 157, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 237, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "HOME/Library/Caches/pypoetry/virtualenvs/bssgeb-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/states.py", line 318, in get_state_exception
    raise ValueError(
ValueError: Failed state result was an iterable of states but none were failed.Khuyen Tran
11/10/2022, 4:17 PMMason Menges
11/10/2022, 4:30 PMmerlin
11/10/2022, 5:55 PMIf not given a FAILED or CRASHED state, this raise a value error.
If the state result is a state, its exception will be returned.
If the state result is an iterable of states, the exception of the first failure will be returned.
If the state result is a string, a wrapper exception will be returned with the string as the message.
If the state result is null, a wrapper exception will be returned with the state message attached.
If the state result is not of a known type, a TypeError will be returned.
When a wrapper exception is returned, the type will be FailedRun if the state type is FAILED or a CrashedRun if the state type is CRASHED.merlin
11/10/2022, 5:57 PMmerlin
11/10/2022, 10:42 PMKhuyen: It seems like what you need is a task-level timeout. Is that correct?Actually, the task is going to retry a bunch of times at 5 min intervals. Now that I know passing
CancelledTrue/False@flow
def test_flow():
    if VPN_connected():
        the_rest_of_the_tasks()
    else:
        return Cancelled(message="VPN connection failed, cancelling flow")merlin
11/11/2022, 12:38 AMZanie
Failed(name="Skipped", message="This shouldn't run for ___ reason")merlin
11/11/2022, 1:28 AMZanie
Zanie
