https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Malthe Karbo

05/16/2022, 7:32 PM
When calling
future.get_state()
, a 403/timeout response will cause flows to fail - is there a way to see api limits for prefect orion cloud? We get forbidden (403) occasionally - it appears to be due to throttling
we have implemented some internal throttling and exponential backoff, it appears to solve it - still would be interested in knowing the limits
z

Zanie

05/16/2022, 8:09 PM
There should not be any throttling on this route, we can investigate the 403 you are receiving.
upvote 2
m

Malthe Karbo

05/16/2022, 8:41 PM
Would love that, please reach out if you need more information
z

Zanie

05/16/2022, 10:57 PM
If you can reproduce it on a local server, that’d be helpful.
m

Malthe Karbo

05/17/2022, 6:03 AM
Cant - We got it consistently on cloud though
By that I mean we can't replicate it on local server
@Zanie did you figure out anything on your end?
z

Zanie

05/19/2022, 5:40 PM
I am working on other things right now, I’ve forwarded this to our team that’s working on this but there’s nothing to report yet.
We don’t seem to have logs of the 403 response internally. Can you share a couple timestamps? A full traceback would be useful as well.
z

Zach Angell

05/19/2022, 6:46 PM
Hi @Malthe Karbo, thanks for reporting this! We're looking into it on our end. Do you have a sense for how often this happens when running against Cloud? For example, does it happen reliably each flow run? Does the flow run have to make a large number of requests (e.g.
future.get_state
calling
GET /task_runs/{id}
many times And yes as Michael mentioned traceback and timestamps would be helpful if you can share them
m

Malthe Karbo

05/19/2022, 7:44 PM
It doesn't happen anymore after we implemented backoff in
future.get_state()
- unfortunately we don't have the logs as we don't keep them in dev It was consistently failing almost every flow - can you check traces / logs on your end for our account?
We are now also receiving
ConnecTimeout
when running
future.get_result()
with exponential back-off, 5 retries
Copy code
Caught exception trying to get state from prefect api: RetryError[<Future at 0x7fcfbe533b80 state=finished raised ConnectTimeout>]
Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_sockets.py", line 186, in connect_tcp
    addr_obj = ip_address(remote_host)
  File "/usr/local/lib/python3.9/ipaddress.py", line 53, in ip_address
    raise ValueError(f'{address!r} does not appear to be an IPv4 or IPv6 address')
ValueError: '<http://api-beta.prefect.io|api-beta.prefect.io>' does not appear to be an IPv4 or IPv6 address

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_sockets.py", line 189, in connect_tcp
    gai_res = await getaddrinfo(
asyncio.exceptions.CancelledError


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
    raise exc
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
    stream = await self._connect(request)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection.py", line 111, in _connect
    stream = await self._network_backend.connect_tcp(**kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/auto.py", line 23, in connect_tcp
    return await self._backend.connect_tcp(
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 101, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.ConnectTimeout


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/pipelines/.../io_utils.py", line 47, in check_state
    state = future.get_state()
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 243, in get_state
    return cast(State[R], sync(self._get_state, client=client))
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 95, in with_injected_client
    return await fn(*args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 249, in _get_state
    task_run = await client.read_task_run(self.run_id)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 1498, in read_task_run
    response = await self._client.get(f"/task_runs/{task_run_id}")
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1729, in get
    return await self.request(
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1506, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 212, in send
    response = await super().send(*args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1593, in send
    response = await self._send_handling_auth(
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1621, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1658, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1695, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pipelines/.../io_utils.py", line 61, in as_completed
    if check_state(future):
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 324, in wrapped_f
    return self(f, *args, **kw)
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 404, in __call__
    do = self.iter(retry_state=retry_state)
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 361, in iter
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x7fcfbe533b80 state=finished raised ConnectTimeout>]
I can't share the flow code in more detail on here, but we use the implementation from the as_completed thread here on slack, will paste below
Copy code
from typing import Any, TypeVar, Iterable, Generator

from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync

from tenacity import retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential


T = TypeVar("T")


@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=0, max=30))
def check_state(future: PrefectFuture[Any, Sync]) -> bool:
    state = future.get_state()
    return state.is_completed()


def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[PrefectFuture[T, Sync], None, None]:
    logger = get_run_logger()
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
            time.sleep(0.1)
            try:
                if check_state(future):
                    completed.add(future)
                    yield future
            except Exception as e:
                logger.exception(
                    f"🤬 Caught exception trying to get state from prefect api: {e}"
                )
                raise e
This is the implementation used for as completed and
Copy code
from typing import TypeVar

from tenacity import retry
from tenacity.wait import wait_exponential
from tenacity.stop import stop_after_attempt

from prefect.futures import PrefectFuture
from prefect.utilities.asyncio import Sync

T = TypeVar("T")
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=0, max=30),
)
def future_result(future: PrefectFuture[T, Sync]) -> T:
    return future.result()


@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=0, max=30),
)
def future_wait(future: PrefectFuture[T, Sync]):
    future.wait()
These are the small utils for wait and direct result fetching used in flows. We use these to wrap futures from tasks
I understand that this issue is not the same as the 403 PermissionDenied - but I figure they might be related?
We also receive runtime errors raised by httpx consistently like this one:
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
    return sync(
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
    raise data
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
    return sync(
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
    raise data
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
    return sync(
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
    raise data
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
    return sync(
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
    raise data
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 407, in __call__
    result = fn(*args, **kwargs)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 48, in future_result
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 208, in result
    return sync(
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 150, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 65, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/futures.py", line 219, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 138, in result
    raise data
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/task_runners.py", line 412, in wait
    return await future.result(timeout=timeout)
  File "/pipelines/.venv/lib/python3.9/site-packages/distributed/client.py", line 294, in _result
    raise exc.with_traceback(tb)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/engine.py", line 716, in begin_task_run
    return task_run.state
  File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/client.py", line 1824, in __aexit__
    return await self._exit_stack.__aexit__(*exc_info)
  File "/usr/local/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_client.py", line 1975, in __aexit__
    await self._transport.__aexit__(exc_type, exc_value, traceback)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
    await self._pool.__aexit__(exc_type, exc_value, traceback)
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
    await self.aclose()
  File "/pipelines/.venv/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/pipelines/.venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 54, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/pipelines/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/tmp/flow-script-<...>-flowg8kh63xy.py", line 222, in <...>_flow
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 324, in wrapped_f
    return self(f, *args, **kw)
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 404, in __call__
    do = self.iter(retry_state=retry_state)
  File "/pipelines/.venv/lib/python3.9/site-packages/tenacity/__init__.py", line 361, in iter
    raise retry_exc from fut.exception()
tenacity.RetryError: RetryError[<Future at 0x7f22b779c430 state=finished raised RuntimeError>]
Hey - did you guys find out anything on your end?
z

Zanie

05/24/2022, 6:37 PM
Hi we’re working a ton of things right now and we’re a small team. We’ll report back when we find something. I’d recommend opening an issue on GitHub for long-term tracking.
6 Views