Here is workaround for flow and subflow dependency...
# ask-community
n
Here is workaround for flow and subflow dependency calling
Copy code
import asyncio
import logging
import os
from functools import wraps
from time import time, sleep
from typing import Optional, Callable, Union
import pendulum
from prefect import flow, get_client, task
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterStartTime,
    DeploymentFilter,
    DeploymentFilterName
)
from prefect.client.schemas.sorting import FlowRunSort
from prefect.deployments import run_deployment

logger = logging.getLogger('bia.demo_f')
logger.setLevel('DEBUG')


class WaitDeploymentsError(Exception):
    pass


async def fetch_latest_runs(client, deploy_names, flow_run_time, check_last_hours):
    """
    Fetch the latest flow runs for the specified deployment names.

    Parameters:
    - client (prefect.Client): Prefect client for API interactions.
    - deploy_names (list): List of deployment names to fetch runs for.
    - flow_run_time (pendulum.DateTime): The starting time for flow run filtering.
    - check_last_hours (int, optional): The number of hours to consider for flow run filtering. Default is 1 hour.


    Returns:
    - list: a list of Flow Run model representations of the flow runs
    """
    _flow_run_time = flow_run_time.subtract(hours = check_last_hours)
    states = await client.read_flow_runs(
        flow_run_filter = FlowRunFilter(
            flow_run_time = FlowRunFilterStartTime(after_ = _flow_run_time)
        ),
        deployment_filter = DeploymentFilter(
            name = DeploymentFilterName(any_ = list(deploy_names))
        ),
        sort = FlowRunSort.START_TIME_DESC,
        limit = len(deploy_names)
    )
    return states


def wait_for_deployments(deployments, flow_run_time_utc_source=None, check_last_hours: int = 0,
                         deployment_timeout: int = 1500, retry_span: int = 5):
    """
    Decorator to wait for specified Prefect flow deployments to complete before triggering the decorated flow function.

    Parameters:
    - deployments (List[str]): List of deployment names to wait for.
    - flow_run_time_utc_source (Union[pendulum.DateTime, Callable], optional): Source for the start time in UTC.
      It can be a UTC value, a synchronous function, or an asynchronous function.
      Default is None, which uses get_flow_run_time_utc_from_env.
    - check_last_hours (int, optional): The number of hours to consider for flow run filtering. Default is 1 hour.
    - deployment_timeout (int, optional): Maximum time (in seconds) to wait for the deployments. Default is 500 seconds.
    - retry_span (int, optional): Time (in seconds) to wait between checking for the latest flow runs. Default is 5 seconds.

    Returns:
    - Callable: Decorated async function.
    """

    async def get_flow_run_time_utc(flow_run_time_source: Optional[
        Union[pendulum.DateTime, Callable[..., Union[pendulum.DateTime, asyncio.Future]]]] = None, *args,
                                    **kwargs) -> pendulum.DateTime:
        """
        Get the flow run time in UTC based on the provided source.

        Parameters:
        - flow_run_time_source (Optional[Union[pendulum.DateTime, Callable[..., Union[pendulum.DateTime, asyncio.Future]]]]):
          Source for flow run time in UTC. It can be a UTC value, a synchronous function, or an asynchronous function.

        Returns:
        - pendulum.DateTime: Flow run time in UTC.

        Raises:
        - TypeError: If the flow_run_time_source is not a UTC value, a synchronous function, or an asynchronous function.
        """
        if flow_run_time_source is None:
            logging.info("flow_run_time_source is `None`, so using current timestamp.")
            return pendulum.now('UTC')
        elif isinstance(flow_run_time_source, pendulum.DateTime):
            logging.info("Using provided UTC value directly.")
            return flow_run_time_source
        elif callable(flow_run_time_source):
            func_name = flow_run_time_source.__name__ if hasattr(flow_run_time_source, "__name__") else "Unknown Function"
            logging.info(f"Using result from callable function: {func_name}")
            result = flow_run_time_source(*args, **kwargs)

            if asyncio.iscoroutine(result):
                logging.info("Asynchronous function result.")
                result = await result

            if isinstance(result, pendulum.DateTime):
                logging.info("Result is of type pendulum.DateTime.")
                return result
            else:
                raise TypeError(f"The result from the callable function {func_name} is not of type pendulum.DateTime. "
                                f"Got {type(result)} instead.")
        else:
            raise TypeError("Invalid flow_run_time_source. It should be a UTC value, a synchronous function, "
                            "or an asynchronous function.")

    def decorator(flow_func):
        @wraps(flow_func)
        async def wrapper(*args, **kwargs):
            """
            Wrapper function that waits for specified deployments to complete before triggering the decorated flow function.
            """
            nonlocal deployments
            try:
                if not isinstance(deployments, list):
                    deployments = [deployments]

                completed_deployments = set()
                pending_deployments = set(deployments)
                start_time_utc = await get_flow_run_time_utc(flow_run_time_utc_source)
                client = get_client()

                logger.info(f"Decorator: Start time for deployment tracking: {start_time_utc}")

                while time() - start_time_utc.timestamp() < deployment_timeout:
                    states = await fetch_latest_runs(client, pending_deployments, start_time_utc, check_last_hours)
                    for latest_run in states:
                        _deployment = await client.read_deployment(latest_run.deployment_id)
                        _deployment_name = _deployment.name
                        _state = latest_run.state_name
                        logger.info(
                            f"Latest flow run for {_deployment_name} found at {latest_run.start_time} with state {_state}.")

                        if _state == 'Completed' and _deployment_name not in completed_deployments:
                            completed_deployments.add(_deployment_name)
                            pending_deployments.remove(_deployment_name)
                            logger.info(
                                f"Flow run for deployment {_deployment_name} completed at {latest_run.start_time}.")

                            if not pending_deployments:
                                logger.info(f"All deployments completed. Triggering {flow_func.__name__} ...")
                                if asyncio.iscoroutinefunction(flow_func):
                                    return asyncio.run(flow_func(*args, **kwargs))
                                else:
                                    return flow_func(*args, **kwargs)

                    logger.info(f"Waiting for the latest flow runs ({round((time() - start_time_utc.timestamp()))}/{deployment_timeout} sec)")
                    logger.info(f"Here is pending deployments count: {len(pending_deployments)}")
                    sleep(retry_span)

                logger.error(f"The maximum loop time exceeded. Marking the flow as 'Failed'.")
                logger.error("Deployments not completed within the specified timeout. Taking appropriate action...")
                return False

            except Exception as e:
                raise WaitDeploymentsError(f"An unexpected error occurred in wait_for_deployments: {e}")

        return wrapper

    return decorator


@task
async def set_current_time_utc():
    """
       Set the CURRENT_TIME_UTC environment variable to the current UTC time.

       This function obtains the current time in UTC using Pendulum, converts it to a string,
       and sets it as the value of the CURRENT_TIME_UTC environment variable.

       Returns:
       None
       CURRENT_TIME_UTC set to: 2024-02-17T12:34:56.789012Z
    """
    current_time_utc = str(pendulum.now('UTC'))
    os.environ['CURRENT_TIME_UTC'] = current_time_utc
    logger.info(f"CURRENT_TIME_UTC set to: {current_time_utc}")


async def get_flow_run_time_utc_from_env(env_var_name='CURRENT_TIME_UTC', format_string='YYYY-MM-DDTHH:mm:ss.SSSSSSZ'):
    """
    Retrieve the UTC start time from the specified environment variable and format it using Pendulum.

    Parameters:
    - env_var_name (str, optional): The name of the environment variable. Default is 'CURRENT_TIME_UTC'.
    - format_string (str, optional): The format string for the time representation. Default is 'YYYY-MM-DDTHH:mm:ss.SSSSSSZ'.

    Returns:
    - pendulum.DateTime: The Pendulum representation of the UTC start time.
    """
    if os.environ.get(env_var_name):
        current_time_utc = pendulum.from_format(os.environ.get(env_var_name), format_string).in_tz('UTC')
        logging.info(f"Environment variable `{env_var_name}` found: {current_time_utc}")
        return
    else:
        current_time_utc = pendulum.now('UTC')
        logging.warning(f"Environment variable `{env_var_name}` not found or empty. Setting default time to {current_time_utc}")
        return current_time_utc


@task
def log_numbers_and_sleep(seconds: int, message: str):
    for i in range(1, seconds + 1):
        logger.info(i)
    logger.info(f"Sleeping for {seconds} seconds...")
    sleep(seconds)
    logger.info("Awake now!")
    return f"Hello from {message}!"


@task
def log_failure(message, context):
    exception_message = context.get('exception', 'Unknown exception')
    logger.warning(f"A flow failed. Waiting for it to change state to completion. Error: {exception_message}")


@flow(name = "subflow")
def subflow():
    return log_numbers_and_sleep(10, "subflow")


@flow(name = "subflow1")
@wait_for_deployments(["subflow"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow1():
    return log_numbers_and_sleep(20, "subflow1")


@flow(name = "subflow2")
@wait_for_deployments(["subflow"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow2():
    return log_numbers_and_sleep(30, "subflow2")


@flow(name = "subflow3")
@wait_for_deployments(["subflow1", "subflow2"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow3():
    return log_numbers_and_sleep(40, "subflow3")


@flow(name = "subflow4")
@wait_for_deployments(["subflow3"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow4():
    return log_numbers_and_sleep(50, "subflow4")


@flow(name = "subflow5")
@wait_for_deployments(["subflow4"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow5():
    return log_numbers_and_sleep(60, "subflow5")


@flow(name = "subflow6")
@wait_for_deployments(["subflow4"], flow_run_time_utc_source=get_flow_run_time_utc_from_env)
def subflow6():
    return log_numbers_and_sleep(70, "subflow6")


@flow(name = "subflow7")
@wait_for_deployments(["subflow6"], flow_run_time_utc_source='get_flow_run_time_utc_from_env')
def subflow7():
    return log_numbers_and_sleep(80, "subflow7")


@flow
async def master():
    # Set the current time in UTC
    await set_current_time_utc()

    # Trigger deployments for multiple subflows concurrently
    await asyncio.gather(
        run_deployment(name = "subflow/subflow"),
        run_deployment(name = "subflow1/subflow1"),
        run_deployment(name = "subflow2/subflow2"),
        run_deployment(name = "subflow3/subflow3"),
        run_deployment(name = "subflow5/subflow5"),
        run_deployment(name = "subflow6/subflow6"),
        run_deployment(name = "subflow4/subflow4"),
        run_deployment(name = "subflow7/subflow7")
    )
Feel free to share thoughts and suggestions.