<@ULVA73B9P>: I have a task that calls `run_deploy...
# marvin-in-the-wild
b
@Marvin: I have a task that calls
run_deployment
to initiate a subflow. The subflow executes fine, but before its finished, the task errors with:
Copy code
prefect.exceptions.PrefectHTTPStatusError: Server error '503 Service Unavailable' for url '<http://workflow.om.local/api/task_runs/de49b7b0-dd47-4868-8996-48c82a312a5e/set_state>'
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503>
I would like to retry the task (because this networking error is hopefully ephemeral), but I am worried that will create another subflow (in addition to the one that is already running)
m
I couldn't find a direct solution in the Prefect documentation on how to handle a failed subflow without duplicating its execution on retry. However, consider implementing manual control over subflow execution so that you can check if a subflow is already running before initiating a new one. This may involve tracking your subflows' states in a database or other storage. Regarding the
503 Service Unavailable
error, you may want to implement retries with backoff logic for your task to handle transient network issues. Prefect allows for custom retry logic using the
@task
decorator parameters such as
max_retries
and
retry_delay
. See Prefect Task documentation (although the link provided was not directly related to retries, consider the suggestion based on standard Prefect practices). As an additional measure, you might want to catch
PrefectHTTPStatusError
exceptions within your task and implement a check to avoid duplicate subflow initiation. Unfortunately, the search didn't yield any open GitHub issues relevant to your problem regarding a
503 Service Unavailable
error in the context of retries and subflows. If your issue persists and is not addressed through documentation, consider raising a new issue in the Prefect GitHub repository for more specific guidance.
b
@Marvin: how would I catch the
PrefectHTTPStatusError
in a way that avoids duplicate subflows?
m
To catch the
PrefectHTTPStatusError
and avoid duplicate subflows, you would need to implement error handling in the task that initiates the subflow. Here's an example of how you could achieve this:
Copy code
python
from prefect import flow, task
from prefect.exceptions import PrefectHTTPStatusError
from prefect.utilities.tasks import task_input_hash
from prefect.engine.results import LocalResult

@task(result=LocalResult(), fail_task_run=False)
def run_deployment_with_subflow(subflow):
    # This hash acts as a unique identifier for subflow instance
    hash = task_input_hash(subflow)
    subflow_running_identifier = f"subflow-{hash}-running"

    try:
        # Your logic to initiate subflow here.
        # You should also implement some logic to record the subflow's running state
        # before initiating it using the 'subflow_running_identifier'.
        
        subflow()

    except PrefectHTTPStatusError as e:
        if "503 Service Unavailable" in str(e):
            # Implement logic to check if subflow is already running
            # using the 'subflow_running_identifier'. If so, avoid re-initiating subflow.
            
            # The function should arrange for later retry of the task by re-raising the exception
            raise e
        else:
            # Handle other exceptions as needed
            raise

@flow
def main_flow():
    # Your main flow logic here.
    pass

if __name__ == "__main__":
    main_flow()
This code uses a task decorator to configure a custom result class and ensure the task does not fail the task run immediately (
fail_task_run=False
). We use a custom hash generated for the subflow's inputs (
task_input_hash
) to uniquely identify the subflow instance. We then wrap the subflow initiation logic in a
try
block to catch the
PrefectHTTPStatusError
. If a
503
occurs, you can use the hash to check if the subflow has already been initiated. If it has, you might want to skip restarting it or handle it in a way that doesn't cause duplication. If the subflow hasn't been started or if the error is deemed transient, re-raising the exception can be used to trigger a retry depending on your task's retry settings. Please note, this example assumes you have a way to reliably determine whether the subflow is already running, for which you might use a database entry or other persistent state indicators keyed by the subflow's unique identifier. Adjust the logic to suit your specific environment and needs. Also, the provided code example was a basic demonstration on how to declare tasks and flows in Prefect; it was not directly related to error handling. You should integrate the above suggestions into your own task that initiates the subflow.
n
🙅
this looks like a server side error, do you have server logs?
reading the above I think you want
retry_condition_fn
- @Marvin please search and explain
m
Certainly,
retry_condition_fn
is a feature that Prefect provides for implementing custom retry conditions in tasks. You can define a callable that returns
True
if the task should be retried and
False
otherwise. This function can inspect the state of the task run to decide whether to retry. Here is an example of how you could use
retry_condition_fn
to handle retries with custom logic:
Copy code
python
import httpx
from prefect import flow, task
from prefect.engine import signals
from prefect.engine.state import State

def retry_handler(task, task_run, state) -> bool:
    """This custom retry handler determines when to retry a task."""
    if isinstance(state.result, httpx.Response) and state.result.status_code == 503:
        # Retry when a 503 Service Unavailable error is encountered
        return True
    elif isinstance(state.result, Exception):
        # Retry for other exceptions as you see fit
        return True
    else:
        # Do not retry for other states
        return False

@task(retries=3, retry_condition_fn=retry_handler)
def some_task_that_might_fail():
    # Your logic that might raise HTTP status errors
    pass

@flow
def my_flow():
    some_task_that_might_fail()

if __name__ == "__main__":
    my_flow()
In the above example, the
retry_handler
function is checking the result of the task's state. If it's an
httpx.Response
with a status code of
503
, the function will return
True
, signaling Prefect to retry the task. You might further refine this logic to track whether a subflow has been initiated already, as discussed earlier. For more details, refer to the Prefect documentation on Custom Retry Behavior.
b
Thanks @Nate, the
retry_condition_fn
does look very useful. I don't have the server logs to hand right now, but my colleague has spun up a second prefect server on a separate node to lower the risk of network issues. (not sure if related) but we also realised that the task had a max wait time of 2 hours, which we have now increased to 1 day.
@Marvin: the documentation says the
idempotency_key
of
run_deployment
is a "unique value to recognize retries of the same run, and prevent creating multiple flow runs.". Can you explain with a simple example how this works in practice?
m
The example provided does not specifically demonstrate the use of an
idempotency_key
. However, I can explain how the
idempotency_key
works with some conceptual code. The
idempotency_key
is used to prevent the creation of multiple flow runs when retrying the same run. It is an optional argument you can pass when triggering a flow run, and it ensures that if the request is retried (for example, due to a network issue), duplicates are not created. Here's a simple example of how you might use it:
Copy code
python
from prefect import flow

@flow(name="subflow")
def subflow():
    # Some logic for your subflow here
    pass

@flow(name="main_flow")
def main_flow():
    # An idempotency_key can be generated based on the context
    # For example, it could be some combination of timestamp and other unique identifiers
    idempotency_key = f"run-{your_unique_identifier_here}"

    # When you want to run a deployment (subflow) within your main flow
    # you include the idempotency_key in the call
    subflow_run = run_deployment(
        subflow,
        name="subflow_execution",
        idempotency_key=idempotency_key
    )

    # Now, even if the attempt to start 'subflow' is retried due to some network issue
    # Prefect will recognize the idempotency_key and not start a new, duplicate subflow

if __name__ == "__main__":
    main_flow()
It is important to ensure that each
idempotency_key
is unique per the context you want to be idempotent. In practice, the
idempotency_key
is used in the flow run's context and is typically a string that is unlikely to collide with other runs (e.g., a UUID, hash of the runtime parameters, etc.). For more details and actual usage, you might want to consult the Prefect documentation or reach out to their support if the functionality isn't clearly documented yet.