Shaoyi Zhang
07/09/2024, 8:59 PMMarvin
07/09/2024, 8:59 PMpython
from prefect import flow, task, get_client
from prefect.states import Cancelled
@task
async def check_running_flows(deployment_id: str, flow_name: str) -> bool:
async with get_client() as client:
running_flows = await client.read_flow_runs(
deployment_id=deployment_id,
state_names=["Running"],
flow_name=flow_name
)
return len(running_flows) > 0
@task
def do_something():
return "Task executed!"
@flow
async def conditional_flow(deployment_id: str, flow_name: str):
running = await check_running_flows(deployment_id, flow_name)
if running:
print("Flow is already running, skipping execution.")
return Cancelled()
result = do_something()
print(result)
if __name__ == "__main__":
deployment_id = "your_deployment_id"
flow_name = "conditional_flow"
conditional_flow(deployment_id, flow_name)
***Explanation:***
1. check_running_flows
Task: This task queries the Prefect API to see if any runs of the given flow are currently in the "Running" state.
2. do_something
Task: This is just a placeholder for whatever work you want your flow to do.
3. conditional_flow
Flow: This checks the result from check_running_flows
. If a flow run is already active, it prints a message and returns a Cancelled
state. Otherwise, it executes the do_something
task.
This setup ensures that your flow skips execution if there is already an active run of the specified deployment.
For more information, you can explore the Prefect conditional tasks documentation. Let me know if I can help you with anything else.