Constantino Schillebeeckx
07/17/2025, 1:31 PMConcurrencySlotAcquisitionError: Unable to acquire concurrency limits ['dbt-task-daily-prod']
...
PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/.../concurrency_limits/increment>'
how should we handle these transient API errors at the flow level so that the flow gracefully retries when this happens?Marvin
07/17/2025, 1:31 PMMarvin
07/17/2025, 1:34 PMpython
from prefect import flow
from prefect.utilities.annotations import retry_delay_seconds_handler
@flow(
retries=3, # Number of retries
retry_delay_seconds=30, # Wait 30 seconds between retries
)
def your_production_flow():
# Your flow logic here
pass
2. Custom retry handling: For more sophisticated control, you can implement a custom retry condition function that specifically handles API errors:
python
from prefect import flow
from prefect.exceptions import PrefectHTTPStatusError
from typing import Optional
def should_retry_api_error(exc: Optional[Exception]) -> bool:
"""Determine if we should retry based on the exception."""
if isinstance(exc, PrefectHTTPStatusError):
# Retry on 500-level errors
return exc.response.status_code >= 500
return True # Retry on other exceptions
@flow(
retries=3,
retry_delay_seconds=30,
retry_condition_fn=should_retry_api_error
)
def your_production_flow():
# Your flow logic here
pass
3. Exponential backoff: For even better resilience, you can implement exponential backoff:
python
from prefect import flow
from prefect.utilities.annotations import exponential_backoff
@flow(
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=2, jitter=True),
retry_condition_fn=should_retry_api_error
)
def your_production_flow():
# Your flow logic here
pass
This configuration will:
- Attempt the flow up to 3 times
- Use exponential backoff starting with a base delay and multiplying by 2 each time
- Add jitter to prevent thundering herd problems
- Only retry on appropriate errors (500s and concurrency issues)
The exponential backoff with jitter is particularly useful for concurrency-related issues as it helps prevent multiple retrying flows from all trying to acquire the slot at the same time.
For your specific case with concurrency limits, you might want to use slightly longer delays since you're waiting for a concurrency slot to become available. Here's what I'd recommend for a production flow:
python
@flow(
retries=5, # More retries for production
retry_delay_seconds=exponential_backoff(
base=60, # Start with 1 minute
backoff_factor=2, # Double each time
jitter=True,
),
retry_condition_fn=should_retry_api_error
)
def your_production_flow():
# Your flow logic here
pass
This will give you retry intervals of approximately:
- 1st retry: ~60 seconds
- 2nd retry: ~120 seconds
- 3rd retry: ~240 seconds
- 4th retry: ~480 seconds
- 5th retry: ~960 seconds
The jitter will add some random variation to these times to prevent concurrent retry attempts.
Remember that this is handling retries at the flow level. If you have specific tasks that need their own retry logic, you can also add similar retry configurations to individual task decorators.
Would you like me to provide any additional details about specific aspects of this solution?Constantino Schillebeeckx
07/17/2025, 8:33 PMretry_condition_fn
- i can't find any docs on it. all i can find is task level settingsMarvin
07/17/2025, 8:33 PMMarvin
07/17/2025, 8:34 PMretry_condition_fn
parameter for flows. This parameter is only available for tasks, not flows. Let me correct my previous response.
Here's the correct way to handle retries for your production flow:
1. Flow-level retries: You can only use basic retry configuration at the flow level:
python
from prefect import flow
@flow(
retries=3, # Number of retries
retry_delay_seconds=30, # Wait 30 seconds between retries
)
def your_production_flow():
# Your flow logic here
pass
2. For more sophisticated retry handling, you'll need to implement it at the task level where you have access to `retry_condition_fn`:
python
from prefect import flow, task
from prefect.exceptions import PrefectHTTPStatusError
from typing import Optional
def should_retry_api_error(exc: Optional[Exception]) -> bool:
"""Determine if we should retry based on the exception."""
if isinstance(exc, PrefectHTTPStatusError):
# Retry on 500-level errors
return exc.response.status_code >= 500
return True # Retry on other exceptions
@task(
retries=3,
retry_delay_seconds=30,
retry_condition_fn=should_retry_api_error
)
def task_with_custom_retries():
# Your task logic here
pass
@flow(
retries=5, # Basic retry configuration for the flow
retry_delay_seconds=60,
)
def your_production_flow():
task_with_custom_retries()
For your specific case with concurrency limits and API errors, I would recommend:
1. Using basic flow-level retries with longer delays for the overall flow:
python
from prefect import flow
from prefect.utilities.annotations import exponential_backoff
@flow(
retries=5,
retry_delay_seconds=exponential_backoff(
base=60, # Start with 1 minute
backoff_factor=2, # Double each time
jitter=True,
),
)
def your_production_flow():
# Your flow logic here
pass
2. If you have specific tasks that interact with the API or need concurrency limits, implement more detailed retry logic at the task level.
I apologize for any confusion my earlier response may have caused. Would you like me to provide more details about task-level retry configurations or other aspects of handling these errors?Constantino Schillebeeckx
07/17/2025, 8:35 PMMarvin
07/17/2025, 8:35 PMMarvin
07/17/2025, 8:36 PMbash
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=300 # 5 minutes
2. Handle it at the deployment level:
Instead of trying to handle it in the flow code (which can't catch these infrastructure errors), you could set up a deployment retry policy. This would need to be configured when you create the deployment:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def your_production_flow():
# your flow code
deployment = Deployment.build_from_flow(
your_production_flow,
name="production-deployment",
retries=3,
retry_delay_seconds=60
)
3. Consider using work queues with multiple workers:
This won't directly solve the API error, but it provides redundancy if one worker encounters issues.
Given that this is affecting your primary production flow, I would recommend:
1. First, open a support ticket with Prefect Cloud about the 500 error you're seeing - this shouldn't happen and they should investigate.
2. In the meantime, implement the concurrency slot wait time increase:
bash
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS=300
3. Consider if your concurrency limits might be too restrictive. Sometimes these errors can occur if the limits are set too tightly and there's contention.
Would you like me to elaborate on any of these approaches? Also, if you're seeing this frequently, it would be valuable to know the pattern of when it occurs (time of day, concurrent flow runs, etc.) as it might point to a configuration issue rather than just an API error.