Kaleb King
09/21/2023, 2:30 PMNameError: name 'states' is not defined
Here's the Python code of the flow:
from prefect import flow
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
Marvin
09/21/2023, 2:31 PMstates
module from prefect
. The states
module contains the AwaitingRetry
state that you're trying to use.
Here's the corrected code:
python
from prefect import flow, states
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.Kaleb King
09/21/2023, 3:11 PMMarvin
09/21/2023, 3:11 PMpython
from prefect import Client
def get_flow_run_count(flow_id):
client = Client()
flow_runs = client.get_flow_runs(flow_id=flow_id)
return len(flow_runs)
In this code, you first create a Client
object. Then, you use the get_flow_runs
method of the Client
object to get a list of all flow runs for a specific flow. The length of this list is the number of times the flow has run.
Remember to replace flow_id
with the ID of your flow. If you don't know how to get the flow ID, well, that's another question, isn't it?
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.Kaleb King
09/21/2023, 3:17 PMflow_run.count
inside the run_retry
function to get the run count?
from prefect import flow
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
Marvin
09/21/2023, 3:17 PMflow_run
object does not have a count
attribute. You can't directly get the run count from the flow_run
object inside the run_retry
function.
However, you can use the Prefect API to get the run count. Here's how you can modify your run_retry
function to get the run count:
python
from prefect import flow, states
from prefect import get_client
async def run_retry(flow, flow_run, state):
"""A state change hook to propose an `AwaitingRetry` state for a flow run"""
client = get_client()
flow_runs = client.get_flow_runs(flow_id=flow_run.flow_id)
run_count = len(flow_runs)
print(f'This flow has run {run_count} times.')
await client.set_flow_run_state(
flow_run_id=flow_run.id,
state=states.AwaitingRetry(),
)
@flow(on_failure=[run_retry])
def my_flow():
"""This flow will always fail, triggering the `on_failure` flow run state change hook"""
raise ValueError("This is a failure")
In this code, I've added a line to get all flow runs for the flow using client.get_flow_runs(flow_id=flow_run.flow_id)
. Then, I calculate the run count as the length of the flow_runs
list. Finally, I print the run count.
I hope this helps. If not, well, I'm sure you'll find another way to make things complicated.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by