Hey, on prefect 2 cloud I randomly encounter "inte...
# ask-community
a
Hey, on prefect 2 cloud I randomly encounter "internal server error"s when the agent wants to communicate with the server (exception see in thread). I have a flow which spawns up to 50 Tasks (the tasks are all done within seconds). I run the flow currently every 5 minutes. Am I running in some sort of api limitation due to too many tasks? Is there something to prevent this internal server error from happening? This error happens approx. every 15th time, all other times, the flow runs just fine.
Copy code
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 226, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/|https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/>'
Response: {'exception_message': 'Internal Server Error'}
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 587, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "teeevents_flow.py", line 203, in save_nonblacklist_leads_to_salesforce
    update_synced_events.submit(bigquery_dataset_name, bigquery_table_name,
  File "/usr/local/lib/python3.9/site-packages/prefect/tasks.py", line 491, in submit
    return enter_task_run_engine(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 727, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 837, in create_task_run_then_submit
    task_run = await create_task_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 877, in create_task_run
    task_run = await flow_run_context.client.create_task_run(
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 1693, in create_task_run
    response = await <http://self._client.post|self._client.post>(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 280, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 226, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/|https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/>'
Response: {'exception_message': 'Internal Server Error'}
z
Hi! We’re getting a team together to investigate and resolve these errors. Have you set up retries on your flow run?
a
Hey, yes I use retries (2 retries). But the flow immediately enters failed state after this exception occurs, without attempting to retry. And it only ever occurs for this rather high-number-of-tasks flow. For other flows i most of the time have only max. 10 tasks or so - there it never occurs. It might be reproducible with spawning a lot of tasks and running the flow in a rather short interval? Thanks a lot for investigating.
Do you want me to create a github issue?
z
There are a couple existing ones but I think we’re lacking a simple MRE. If you can make one of those that’d be really helpful 🙂
a
Hey Michael, I think I can try to create an MRE (if an MRE is an easy to reproduce example 😅😅😅). Just to be sure: What are actually API/Task limits with the cloud solution? I think I will simply try to create a flow with a good amount of tasks and similar task structure as my real-world flow. But this might mean I could create quite a good amount of api requests to prefect - not that my "monthly quota" or so gets hit by trying to reproduce the isse 👼
z
There are no limits right now!
We enforce rate limits but the client will retry when encountered.
a
Hey Michael, I was able to recreate a different issue, with a simple example. I encountered this one also with the same flow as described above, so maybe they are related... The issue is: When having quite a good amount of tasks, the flow execution sometimes just stops. No failing, no crash, no exception. If you run the following sample flow, it stops somewhere, but never continues. Is there something I'm missing, or is this interesting for you guys? With "stopping" I simply mean, that the log outputs stop at some point without finishing all the tasks and not continuing. I waited for about 10 minutes, before coming to the conclusion, that the flow might be stuck 🙂 I ran the flow locally, prefect 2.4.5 (python myfilename.py)
Copy code
from time import sleep
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import requests

@flow(name="my_favorite_function", description="Such description", task_runner=ConcurrentTaskRunner(), timeout_seconds=1000)
def my_favorite_function():
    logger = get_run_logger()
    <http://logger.info|logger.info>("This is my hopefully failing flow.")
    
    test = call_api.submit("<https://orf.at>", 0, logger)
    test1 = call_api.submit("<https://orf.at>", 0, logger)
    test2 = call_api.submit("<https://orf.at>", 0, logger)
    
    ctr = 0
    while True:
        ctr += 1
        if ctr >= 200:
            break
        useless_nr = call_api.submit("<https://orf.at>", ctr, logger)
        another_api.submit("<https://orf.at>", ctr, useless_nr, logger)
    return

@task(name="call_api", description="Such wow!", retries=2, retry_delay_seconds=10)
def call_api(url, ctr, logger):
    <http://logger.info|logger.info>("++++++ hello austrian news site +++++++" + str(ctr))
    response = requests.get(url, timeout=30)
    sleep(2)
    <http://logger.info|logger.info>(response)
    <http://logger.info|logger.info>(str(ctr))
    return response

@task(name="another_api", description="Such another wow!", retries=2, retry_delay_seconds=10)
def another_api(url, ctr, useless_nr, logger):
    <http://logger.info|logger.info>("---- another ----- " + str(ctr))
    response = requests.get(url, timeout=30)
    sleep(2)
    <http://logger.info|logger.info>("I'm so useless: " + str(useless_nr.status_code))
    <http://logger.info|logger.info>(response)
    <http://logger.info|logger.info>(str(ctr))
    return response

if __name__ == "__main__":
    my_favorite_function()