<@ULVA73B9P> I have scheduled a deployment to run ...
# ask-marvin
b
@Marvin I have scheduled a deployment to run every 15 minutes. However, there are instances where it may take longer to complete, which could potentially trigger the commencement of the subsequent deployment while the current one is still running. To prevent overlapping executions, I would prefer either to have the upcoming deployment automatically cancel itself, or to have the subsequent deployments adjust their schedule to realign with the original 15-minute interval. One solution I am considering is dedicating a worker pool to this deployment with a concurrency limit set to 1, assuming I've interpreted the documentation accurately. Would this approach work, or is there a better alternative to ensure that only one deployment runs at a time?
m
Based on the information I found, it seems like your approach of dedicating a worker pool to this deployment with a concurrency limit set to 1 could work. This would ensure that only one deployment runs at a time. Work pools can be used to prioritize (or limit) flow runs through the use of work queues. You can configure default infrastructure configurations on your work pools that all jobs inherit and can override. However, I would recommend checking the Prefect documentation or community forums for more specific information or potential alternatives. Here are some resources that might help: - Deploying Flows - Discussion on Locking mechanism used by the agents - Using Workers and Work Pools
m
@Brian Newman Did the suggested worker pool solve the problem? I anticipate that all subflows would be forced into single task sequential work, which defeats the purpose in my use-case.
b
The worker pool did work but I didn’t want jobs to stack up. So I instead went a different route. Also, The limit only affected the main flow not the sub flows from it. I run the flow at the same cadence and I have a function that runs at the beginning that uses the API to check to see if another instance of the flow name is running and then I return a canceled perfect status.
m
cool news for me! I only want to limit the main flow, and let many subflows exist. This is very helpful information for my use-case. But, can you share a snippet to demo the API check for instance of flow? I've been puzzling over that and its key functionality for a generic dependency-checking feature. Thanks again for the very helpful comment ! ❤️🙌thank you
🙌 1
@Brian Newman Please share snippet of checking API for flow run of same flow.
b
Sorry for the delay! That is great news!!
Copy code
async def check_if_current_flow_is_running_in_prefect() -> bool:
    """Asynchronously determines if there is another instance of the current Prefect flow run with the same parameters.

    This function checks the current flow run against other running flows with the same name.
    It returns True if another flow run with the same parameters is found, otherwise False.

    Returns:
        bool: True if another instance with the same parameters is running, False otherwise.

    """
    flow_run_name = flow_run.flow_name
    flow_run_id = flow_run.id
    flow_params = flow_run.parameters
    print(f"Checking if {flow_run_name} is running")
    flow_runs_list = await get_prefect_flow_runs_running_by_flow_name(
        flow_name=flow_run_name
    )

    for flow_run_item in flow_runs_list:
        print(flow_run_item)
        if flow_run_item["id"] != flow_run_id and flow_params_match(
            flow_run_item["parameters"], flow_params
        ):
            return True

    return False


async def get_prefect_flow_runs_running_by_flow_name(flow_name: str) -> list[dict]:
    """This function returns a list of all running flow runs for a given flow name in Prefect Cloud.

    Args:
        flow_name (str): The name of the flow to get running flow runs for.

    Returns:
        list[dict]: A list of dictionaries representing the running flow runs for the given flow name.
    """
    url = f"{PREFECT_API_URL}/flow_runs/filter"

    headers = {
        "Authorization": f"Bearer {PREFECT_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "flow_runs": {
            "operator": "and_",
            "state": {
                "operator": "and_",
                "type": {"any_": ["RUNNING"]},
            },
        },
        "flows": {
            "name": {"any_": [flow_name]},
        },
    }
    async with httpx.AsyncClient(timeout=120, follow_redirects=True) as client:
        response = await <http://client.post|client.post>(url, headers=headers, json=payload)
        return response.json()


def flow_params_match(
    flow_run_item_params: dict[str, Any], flow_params: dict[str, Any]
) -> bool:
    """Checks if the parameters of the flow run item match the given flow parameters.

    Args:
        flow_run_item_params (dict[str, Any]): Parameters of a flow run item.
        flow_params (dict[str, Any]): Parameters of the current flow.

    Returns:
        bool: True if parameters match, False otherwise.
    """
    for key, value in flow_params.items():
        if key in flow_run_item_params:
            # Handling for lists containing Enums or other special cases
            if isinstance(value, list):
                new_list = [
                    item.value if isinstance(item, Enum) else item for item in value
                ]
                value = new_list  # noqa PLW2901

            # Handling for individual Enum values
            if isinstance(value, Enum):
                value = value.value  # noqa PLW2901

            if flow_run_item_params[key] != value:
                return False
    return True
Then at the beginning of the flow I do this.
Copy code
flow_check: bool = await check_if_current_flow_is_running_in_prefect()
        if flow_check:
            <http://logger.info|logger.info>("The flow is cancelled")
            return Cancelled(message="The flow is cancelled")
🙌 1
thank you 1