Suraj Mittal

    Suraj Mittal

    1 year ago
    Hi Everyone, Wanted to know if its possible to trigger a pipeline based on states of other pipelines? Say I have a pipeline-1 which runs on a daily schedule. I need to run a pipeline 2 immediately after pipeline 1 only if the pipeline 1 succeeds.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Suraj Mittal! Yes this is possible. Let me find a script for you.
    Suraj Mittal

    Suraj Mittal

    1 year ago
    Amazing. Thank you @Kevin Kho
    Kevin Kho

    Kevin Kho

    1 year ago
    The first subflow checks a condition and if the condition is met, it returns a
    SKIP
    . The “main flow” checks the result of the subflow and uses a case statement to decide to run other subflows.
    import prefect
    from prefect import Flow, task, Parameter
    from prefect.engine.state import Skipped
    from prefect.tasks.prefect import StartFlowRun
    from prefect.engine import signals
    from prefect.client.client import Client
    from prefect.tasks.control_flow.case import case
    import datetime
    
    @task
    def check_bucket():
    
        # Logic to check for new files
        data = []
    
        if len(data) == 0:
            raise signals.SKIP()
        else:
            pass
        return data
    
    
    @task
    def register_data(data):
        if len(data) > 1:
            for point in data:
                logger = prefect.context.get("logger")
                <http://logger.info|logger.info>(point)
            return
    
    @task
    def preprocess_data():
        x = 1
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Executed {x}")
        return x
    
    @task
    def check_previous_skip(flow_run_signal):
        client = Client()
        flow_state = client.get_flow_run_info(flow_run_signal.state.message.split(' ')[0])
        state = [t for t in flow_state.task_runs if "check_bucket" in t.task_slug][0].state
    
        if state.is_skipped():
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>("There was a skip in previous Flow")
            return True
        else:
            return False
    
    with Flow("flow_1") as flow1:
        data = check_bucket()
        register_data(data)
    
    with Flow("flow_2") as flow2:
        preprocess_data()
    
    # Register block
    flow1.register("testing-result")
    flow2.register("testing-result")
    
    start = StartFlowRun(project_name="testing-result", wait = True)
    
    with Flow('master-flow') as flow:
        run_id = start(flow_name="flow_1")
    
        # True or False
        skipped = check_previous_skip(run_id)
    
        # Insert other flows here
        with case(skipped, False):
            start(flow_name="flow_2")
            start(flow_name="flow_2")
        
    flow.run()
    This is close to what you want. You want to modify the first flow to raise a SKIP and pass that to the main flow.
    Suraj Mittal

    Suraj Mittal

    1 year ago
    Just to check if i'm understanding this right. I should be able to omit all the flow_1 code from the above code if I know the name of the flow_1 and the project correct?
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes that sounds right.
    Suraj Mittal

    Suraj Mittal

    1 year ago
    Amazing. Thanks a lot.
    merlin

    merlin

    1 year ago
    @Suraj Mittal So this seems like not the usual way of including dependencies, why is a workaround needed in this case? I thought a flow of flows would wait on dependencies. I must be missing something, can you explain the use-case as different than a typical flow?
    Kevin Kho

    Kevin Kho

    1 year ago
    Flow of flows will wait for dependencies but
    SKIP
    for a flow is treated as
    SUCCESS
    at the moment, which triggers downstream flows. Passing state and results between subflows is something we’re working to improve.