Brian Newman
11/02/2023, 1:40 PMMarvin
11/02/2023, 1:41 PMmerlin
01/16/2024, 6:37 PMBrian Newman
01/17/2024, 3:10 AMmerlin
01/17/2024, 6:41 AMmerlin
01/18/2024, 12:21 AMBrian Newman
01/18/2024, 2:20 PMasync def check_if_current_flow_is_running_in_prefect() -> bool:
"""Asynchronously determines if there is another instance of the current Prefect flow run with the same parameters.
This function checks the current flow run against other running flows with the same name.
It returns True if another flow run with the same parameters is found, otherwise False.
Returns:
bool: True if another instance with the same parameters is running, False otherwise.
"""
flow_run_name = flow_run.flow_name
flow_run_id = flow_run.id
flow_params = flow_run.parameters
print(f"Checking if {flow_run_name} is running")
flow_runs_list = await get_prefect_flow_runs_running_by_flow_name(
flow_name=flow_run_name
)
for flow_run_item in flow_runs_list:
print(flow_run_item)
if flow_run_item["id"] != flow_run_id and flow_params_match(
flow_run_item["parameters"], flow_params
):
return True
return False
async def get_prefect_flow_runs_running_by_flow_name(flow_name: str) -> list[dict]:
"""This function returns a list of all running flow runs for a given flow name in Prefect Cloud.
Args:
flow_name (str): The name of the flow to get running flow runs for.
Returns:
list[dict]: A list of dictionaries representing the running flow runs for the given flow name.
"""
url = f"{PREFECT_API_URL}/flow_runs/filter"
headers = {
"Authorization": f"Bearer {PREFECT_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"flow_runs": {
"operator": "and_",
"state": {
"operator": "and_",
"type": {"any_": ["RUNNING"]},
},
},
"flows": {
"name": {"any_": [flow_name]},
},
}
async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
response = await <http://client.post|client.post>(url, headers=headers, json=payload)
return response.json()
def flow_params_match(
flow_run_item_params: dict[str, Any], flow_params: dict[str, Any]
) -> bool:
"""Checks if the parameters of the flow run item match the given flow parameters.
Args:
flow_run_item_params (dict[str, Any]): Parameters of a flow run item.
flow_params (dict[str, Any]): Parameters of the current flow.
Returns:
bool: True if parameters match, False otherwise.
"""
for key, value in flow_params.items():
if key in flow_run_item_params:
# Handling for lists containing Enums or other special cases
if isinstance(value, list):
new_list = [
item.value if isinstance(item, Enum) else item for item in value
]
value = new_list # noqa PLW2901
# Handling for individual Enum values
if isinstance(value, Enum):
value = value.value # noqa PLW2901
if flow_run_item_params[key] != value:
return False
return True
Then at the beginning of the flow I do this.
flow_check: bool = await check_if_current_flow_is_running_in_prefect()
if flow_check:
<http://logger.info|logger.info>("The flow is cancelled")
return Cancelled(message="The flow is cancelled")