Hello Prefect community, how can I retry a task on...
# ask-community
n
Hello Prefect community, how can I retry a task on specific error codes (500) with the same inputs but using conditional logic based on the number of retries. So if the previous error code was 500 and the current retry count is 2, follow this particular path, otherwise follow the standard path? Our OCR pipeline has a long running task that often returns 500 if the file cannot be read. When we receive this error, we want to do postprocessing on the pdf (converting to an image) and rerun the task.
n
hi @Nicolas Ouporov - you could use
return_state
here in combination with
retries
Copy code
import httpx
from prefect import flow, task
from prefect.states import Failed

@task(retries=1)
def read_file(url: str):
    try:
        r = httpx.get(url)
        r.raise_for_status()
    except httpx.HTTPStatusError as e:
        return Failed(message=f"got a {e.response.status_code}")

@flow
def ocr():
    state = read_file("<http://google.com/api>", return_state=True)
    if state.is_failed():
        if "404" in state.message: # or whatever you need to check for
            print("doing something specific + trying again")
            read_file("<https://www.prefect.io>")
        else:
            raise RuntimeError(state.message)
    return "all good"
n
@Nate Hell yeah nate. You are the 🐐
šŸ’™ 1
@Nate Wil this work in an async context? My code looks like this
Copy code
@flow
async def run_ocr_in_batches(
    all_file_contents: List[bytes], metadata: List[dict], batch_size: int = 10
):
    tasks = []
    client_count = len(document_intelligence_clients)

    for i, batch in enumerate(chunks(zip(all_file_contents, metadata), batch_size)):
        batch_list = list(batch)

        client = document_intelligence_clients[i % client_count]

        for file_content, metadata in batch_list:
            # Schedule the task for execution and store the future
            future = run_ocr.submit(
                client, file_content, metadata["file_path"], metadata["page_count"], i % client_count
            )
            tasks.append((future, metadata["file_path"]))

    # Await the futures and gather results
    results = []
    for future, file_path in tasks:
        ocr_result = future.result()
        processed_result = process_ocr_results(ocr_result, file_path)
        results.append(processed_result)

    return results
n
yeah shouldnt make a difference if its async or sync,
return_state
and
retries
should work the same way, just have to
await
your
async
stuff
looks like you could also be using map ^ if you wanted
n
Awesome. Wdym by map?
n
Copy code
In [1]: from prefect import flow, task, unmapped
18:21:47.252 | DEBUG   | prefect.profiles - Using profile 'pong'

In [2]: @task
   ...: def foo(dynamic, static):
   ...:     print(f"got {dynamic} and {static}")
   ...:     return dynamic
   ...:

In [3]: @flow(log_prints=True)
   ...: def f():
   ...:     futures = foo.map(range(10), unmapped(42))
   ...:     more_futures = foo.map(futures, unmapped("baz")) # futures get resolved by us
   ...:     print([f.result() for f in more_futures])

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
map
calls
submit
for each item in an iterable input, and you can have
unmapped
or
static
parameters that don't get mapped over
n
big thanks - are there any prefect docs on this?
n
thanks
šŸ‘ 1
Hey @Nate is there a way to use the standard prefect retry logic like
@task(_retries_=2, _retry_delay_seconds_=[1, 30], _retry_jitter_factor_=0.5)
when the error code is 429 and use the custom logic as you suggested when the error code is 500?
n
yep, there's
retry_condition_fn
n
@Nate Yeah I'm aware of the retry_condition_fn - just not sure how I would incorporate it's usage with the code you provided for conditional retries, where we are implementing conditional logic for the retry.
Copy code
@task(retries=1)
def read_file(url: str):
    try:
        r = httpx.get(url)
        r.raise_for_status()
    except httpx.HTTPStatusError as e:
        return Failed(message=f"got a {e.response.status_code}")

@flow
def ocr():
    state = read_file("<http://google.com/api>", return_state=True)
    if state.is_failed():
        if "404" in state.message: # or whatever you need to check for
            print("doing something specific + trying again")
            read_file("<https://www.prefect.io>")
        else:
            raise RuntimeError(state.message)
    return "all good"
n
hmm i think i lack context on why you'd need both
n
When the our API call for OCR fails with a 429 error code, it means that we are hitting a rate limit, indicating we should retry the same API call a few seconds later - using prefect's standard retry logic (exponential backoff, jitter, etc). However if the response we get from the OCR is a 500, internal server error, it means there are files we sent the API for OCR that cannot be processed - in this case, we want to get the page that failed (provided in the API error response), convert that bad page into an image, and retry the task with the modified input.
n
gotcha, then yeah i’d just keep those retry settings you have there, return a Failed state that contains the status code. that state will then be available in the handler you pass to retry_condition_fn. use the status code to return True if 429 (retry settings will be respected) and False if 500 (no more retries). you can then have an on_failure hook (after retries are exhausted or canceled via retry handler) to persist details about your failure for later
and if you need to immediately act on the Failed state from the callers perspective, you can keep the return_state stuff i shared earlier
šŸ™Œ 1
n
Ok perfect. And how can we ensure that we are not having the whole flow fail, which is happening right now, when one or more tasks fail.
n
that’s what return_state is for. you get the state back and you can check if is_failed() before you try to ā€œunwrapā€ it with .result() (which would raise the error if the underlying task run failed) - there’s docs if you search ā€œfinal state determinationā€
šŸ™Œ 1
n
Thanks Nate! Top tier support
šŸ‘ 1
117 Views