Kaleb King
09/21/2023, 2:30 PMNameError: name 'states' is not defined
Here's the Python code of the flow:
from prefect import flow
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
Marvin
09/21/2023, 2:31 PMstates
module from prefect
. The states
module contains the AwaitingRetry
state that you're trying to use.
Here's the corrected code:
python
from prefect import flow, states
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.Kaleb King
09/21/2023, 3:11 PMMarvin
09/21/2023, 3:11 PMpython
from prefect import Client
def get_flow_run_count(flow_id):
client = Client()
flow_runs = client.get_flow_runs(flow_id=flow_id)
return len(flow_runs)
In this code, you first create a Client
object. Then, you use the get_flow_runs
method of the Client
object to get a list of all flow runs for a specific flow. The length of this list is the number of times the flow has run.
Remember to replace flow_id
with the ID of your flow. If you don't know how to get the flow ID, well, that's another question, isn't it?
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.Kaleb King
09/21/2023, 3:17 PMflow_run.count
inside the run_retry
function to get the run count?
from prefect import flow
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
Marvin
09/21/2023, 3:17 PMflow_run
object does not have a count
attribute. You can't directly get the run count from the flow_run
object inside the run_retry
function.
However, you can use the Prefect API to get the run count. Here's how you can modify your run_retry
function to get the run count:
python
from prefect import flow, states
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
flow_runs = client.get_flow_runs(flow_id=flow_run.flow_id)
run_count = len(flow_runs)
print(f'This flow has run {run_count} times.')
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
In this code, I've added a line to get all flow runs for the flow using client.get_flow_runs(flow_id=flow_run.flow_id)
. Then, I calculate the run count as the length of the flow_runs
list. Finally, I print the run count.
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.