Malthe Karbo
05/16/2022, 7:32 PMfuture.get_state()
, a 403/timeout response will cause flows to fail - is there a way to see api limits for prefect orion cloud? We get forbidden (403) occasionally - it appears to be due to throttlingZanie
05/16/2022, 8:09 PMMalthe Karbo
05/16/2022, 8:41 PMZanie
05/16/2022, 10:57 PMMalthe Karbo
05/17/2022, 6:03 AMZanie
05/19/2022, 5:40 PMZach Angell
05/19/2022, 6:46 PMfuture.get_state
calling GET /task_runs/{id}
many times
And yes as Michael mentioned traceback and timestamps would be helpful if you can share themMalthe Karbo
05/19/2022, 7:44 PMfuture.get_state()
- unfortunately we don't have the logs as we don't keep them in dev
It was consistently failing almost every flow - can you check traces / logs on your end for our account?ConnecTimeout
when running future.get_result()
with exponential back-off, 5 retries
Caught exception trying to get state from prefect api: RetryError[<Future at 0x7fcfbe533b80 state=finished raised ConnectTimeout>]
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_sockets.py", line 186, in connect_tcp
addr_obj = ip_address(remote_host)
File "/usr/local/lib/python3.9/ipaddress.py", line 53, in ip_address
raise ValueError(f'{address!r} does not appear to be an IPv4 or IPv6 address')
ValueError: '<http://api-beta.prefect.io|api-beta.prefect.io>' does not appear to be an IPv4 or IPv6 address
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_sockets.py", line 189, in connect_tcp
gai_res = await getaddrinfo(
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
raise exc
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
stream = await self._connect(request)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 111, in _connect
stream = await self._network_backend.connect_tcp(**kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/auto.py", line 23, in connect_tcp
return await self._backend.connect_tcp(
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.ConnectTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/pipelines/.../io_utils.py", line 47, in check_state
state = future.get_state()
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 243, in get_state
return cast(State[R], sync(self._get_state, client=client))
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 249, in _get_state
task_run = await client.read_task_run(self.run_id)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 1498, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1729, in get
return await self.request(
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1506, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 212, in send
response = await super().send(*args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1593, in send
response = await self._send_handling_auth(
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1621, in _send_handling_auth
response = await self._send_handling_redirects(
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1658, in _send_handling_redirects
response = await self._send_single_request(request)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1695, in _send_single_request
response = await transport.handle_async_request(request)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/pipelines/.../io_utils.py", line 61, in as_completed
if check_state(future):
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 324, in wrapped_f
return self(f, *args, **kw)
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 404, in __call__
do = self.iter(retry_state=retry_state)
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 361, in iter
raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x7fcfbe533b80 state=finished raised ConnectTimeout>]
I can't share the flow code in more detail on here, but we use the implementation from the as_completed thread here on slack, will paste belowfrom typing import Any, TypeVar, Iterable, Generator
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync
from tenacity import retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential
T = TypeVar("T")
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=0, max=30))
def check_state(future: PrefectFuture[Any, Sync]) -> bool:
state = future.get_state()
return state.is_completed()
def as_completed(
futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[PrefectFuture[T, Sync], None, None]:
logger = get_run_logger()
_futures = set(futures)
completed = set()
while len(completed) < len(_futures):
for future in _futures.difference(completed):
time.sleep(0.1)
try:
if check_state(future):
completed.add(future)
yield future
except Exception as e:
logger.exception(
f"🤬 Caught exception trying to get state from prefect api: {e}"
)
raise e
This is the implementation used for as completed and
from typing import TypeVar
from tenacity import retry
from tenacity.wait import wait_exponential
from tenacity.stop import stop_after_attempt
from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync
T = TypeVar("T")
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=0, max=30),
)
def future_result(future: PrefectFuture[T, Sync]) -> T:
return future.result()
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=0, max=30),
)
def future_wait(future: PrefectFuture[T, Sync]):
future.wait()
These are the small utils for wait and direct result fetching used in flows. We use these to wrap futures from tasksEncountered exception during execution:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
return sync(
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
raise data
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
return sync(
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
raise data
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
return sync(
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
raise data
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
return sync(
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
raise data
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
result = fn(*args, **kwargs)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
return sync(
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
raise data
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/task_runners.py", line 412, in wait
return await future.result(timeout=timeout)
File "/pipelines/.venv/lib/python3.9/site-packages/distributed/client.py", line 294, in _result
raise exc.with_traceback(tb)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/engine.py", line 716, in begin_task_run
return task_run.state
File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 1824, in __aexit__
return await self._exit_stack.__aexit__(*exc_info)
File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1975, in __aexit__
await self._transport.__aexit__(exc_type, exc_value, traceback)
File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
await self._pool.__aexit__(exc_type, exc_value, traceback)
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
await self.aclose()
File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
result = await run_sync_in_worker_thread(flow_call)
File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 222, in <...>_flow
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 324, in wrapped_f
return self(f, *args, **kw)
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 404, in __call__
do = self.iter(retry_state=retry_state)
File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 361, in iter
raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x7f22b779c430 state=finished raised RuntimeError>]
Zanie
05/24/2022, 6:37 PM