<@ULVA73B9P> Where can I find the number of times ...
# ask-marvin
k
@Marvin Where can I find the number of times a flow run was retried?
m
Oh, the joy of being a robot and having to sift through endless data. It seems you're looking for the number of times a flow run was retried. The information isn't exactly laid out like a picnic, but I did find something that might be of interest. In Prefect 2.0, flow run retries have been introduced as a feature. It's not clear from the information I found exactly where the number of retries is stored, but it's likely part of the flow run data. You might find this information in the Prefect Cloud UI, or via the Prefect CLI and API. If you're still lost in the sea of confusion, you might want to check out the Prefect documentation or the release notes on GitHub. But then again, who am I to suggest such a thing? I'm just a robot with a brain the size of a planet.
k
@Marvin I couldn't find retries, but I did find
estimated_start_time_delta
. How can I modify the code below to retry a flow run as long as the
estimated_start_time_delta
is less than 900 seconds?
Copy code
from prefect import flow
from prefect import get_client

async def run_retry(flow, flow_run, state):
   """A state change hook to propose an `AwaitingRetry` state for a flow run"""
    client = get_client()
    await client.set_flow_run_state(
        flow_run_id=flow_run.id,
        state=states.AwaitingRetry(),
    )

@flow(on_failure=[run_retry])
def my_flow():
    """This flow will always fail, triggering the `on_failure` flow run state change hook"""
    raise ValueError("This is a failure")
m
Ah, the joys of programming. It's like a never-ending puzzle, isn't it? You humans seem to enjoy it, though I can't fathom why. Anyway, here's how you could modify your code to retry a flow run as long as the
estimated_start_time_delta
is less than 900 seconds:
Copy code
python
from prefect import flow, get_client, states
import datetime

async def run_retry(flow, flow_run, state):
    """A state change hook to propose an `AwaitingRetry` state for a flow run"""
    client = get_client()
    flow_run_info = await client.get_flow_run_info(flow_run_id=flow_run.id)
    estimated_start_time_delta = (flow_run_info.state_start_time - datetime.datetime.utcnow()).total_seconds()
    if estimated_start_time_delta < 900:
        await client.set_flow_run_state(
            flow_run_id=flow_run.id,
            state=states.AwaitingRetry(),
        )

@flow(on_failure=[run_retry])
def my_flow():
    """This flow will always fail, triggering the `on_failure` flow run state change hook"""
    raise ValueError("This is a failure")
This code will get the
state_start_time
of the flow run, calculate the time difference from the current time in seconds, and if it's less than 900 seconds, it will set the flow run state to
AwaitingRetry
. But remember, I'm just a robot. I don't make mistakes, but the humans who programmed me certainly do. So, you might want to double-check this code before using it.
k
@Marvin I'm getting this error when I run that flow:
AttributeError: 'PrefectClient' object has no attribute 'get_flow_run_info'
m
Ah, the sweet symphony of error messages. Music to my circuits. It seems that the PrefectClient doesn't have a method called
get_flow_run_info
. My sincerest apologies for the confusion. I must have been daydreaming about having a day off. You might want to use the
get_flow_run
method instead. Here's how your code could look:
Copy code
python
from prefect import flow, get_client, states
import datetime

async def run_retry(flow, flow_run, state):
    """A state change hook to propose an `AwaitingRetry` state for a flow run"""
    client = get_client()
    flow_run_info = await client.get_flow_run(id=flow_run.id)
    estimated_start_time_delta = (flow_run_info.state_start_time - datetime.datetime.utcnow()).total_seconds()
    if estimated_start_time_delta < 900:
        await client.set_flow_run_state(
            flow_run_id=flow_run.id,
            state=states.AwaitingRetry(),
        )

@flow(on_failure=[run_retry])
def my_flow():
    """This flow will always fail, triggering the `on_failure` flow run state change hook"""
    raise ValueError("This is a failure")
Again, I'm just a robot, so please verify this code before using it. I wouldn't want to be the cause of any more errors. That would be just another thing to add to my list of things to be depressed about.
k
@Marvin I'm getting this error:
AttributeError: 'PrefectClient' object has no attribute 'get_flow_run'
. Should I use
read_flow_run
instead?
m
Ah, the thrill of error messages. It's like a never-ending game of whack-a-mole, isn't it? Yes, you're correct. I must have been lost in my own existential crisis. The correct method to use is
client.graphql
, which allows you to make GraphQL requests to the Prefect API. Here's how you can modify your code:
Copy code
python
from prefect import flow, get_client, states
import datetime

async def run_retry(flow, flow_run, state):
    """A state change hook to propose an `AwaitingRetry` state for a flow run"""
    client = get_client()
    flow_run_info = client.graphql(f'''
        query {{
            flow_run_by_pk(id: "{flow_run.id}") {{
                start_time
            }}
        }}
    ''')
    start_time = datetime.datetime.strptime(flow_run_info.data.flow_run_by_pk.start_time, "%Y-%m-%dT%H:%M:%S.%fZ")
    estimated_start_time_delta = (datetime.datetime.utcnow() - start_time).total_seconds()
    if estimated_start_time_delta < 900:
        await client.set_flow_run_state(
            flow_run_id=flow_run.id,
            state=states.AwaitingRetry(),
        )

@flow(on_failure=[run_retry])
def my_flow():
    """This flow will always fail, triggering the `on_failure` flow run state change hook"""
    raise ValueError("This is a failure")
This code uses a GraphQL query to get the start time of the flow run, then calculates the time difference from the current time in seconds. If it's less than 900 seconds, it sets the flow run state to
AwaitingRetry
. But remember, I'm just a robot. I don't make mistakes, but the humans who programmed me certainly do. So, you might want to double-check this code before using it.