Hi Everyone, Wanted to know if its possible to tri...
# ask-community
s
Hi Everyone, Wanted to know if its possible to trigger a pipeline based on states of other pipelines? Say I have a pipeline-1 which runs on a daily schedule. I need to run a pipeline 2 immediately after pipeline 1 only if the pipeline 1 succeeds.
k
Hi @Suraj Mittal! Yes this is possible. Let me find a script for you.
🙌 1
s
Amazing. Thank you @Kevin Kho
k
The first subflow checks a condition and if the condition is met, it returns a
SKIP
. The “main flow” checks the result of the subflow and uses a case statement to decide to run other subflows.
Copy code
import prefect
from prefect import Flow, task, Parameter
from prefect.engine.state import Skipped
from prefect.tasks.prefect import StartFlowRun
from prefect.engine import signals
from prefect.client.client import Client
from prefect.tasks.control_flow.case import case
import datetime

@task
def check_bucket():

    # Logic to check for new files
    data = []

    if len(data) == 0:
        raise signals.SKIP()
    else:
        pass
    return data


@task
def register_data(data):
    if len(data) > 1:
        for point in data:
            logger = prefect.context.get("logger")
            <http://logger.info|logger.info>(point)
        return

@task
def preprocess_data():
    x = 1
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Executed {x}")
    return x

@task
def check_previous_skip(flow_run_signal):
    client = Client()
    flow_state = client.get_flow_run_info(flow_run_signal.state.message.split(' ')[0])
    state = [t for t in flow_state.task_runs if "check_bucket" in t.task_slug][0].state

    if state.is_skipped():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("There was a skip in previous Flow")
        return True
    else:
        return False

with Flow("flow_1") as flow1:
    data = check_bucket()
    register_data(data)

with Flow("flow_2") as flow2:
    preprocess_data()

# Register block
flow1.register("testing-result")
flow2.register("testing-result")

start = StartFlowRun(project_name="testing-result", wait = True)

with Flow('master-flow') as flow:
    run_id = start(flow_name="flow_1")

    # True or False
    skipped = check_previous_skip(run_id)

    # Insert other flows here
    with case(skipped, False):
        start(flow_name="flow_2")
        start(flow_name="flow_2")
    
flow.run()
This is close to what you want. You want to modify the first flow to raise a SKIP and pass that to the main flow.
s
Just to check if i'm understanding this right. I should be able to omit all the flow_1 code from the above code if I know the name of the flow_1 and the project correct?
k
Yes that sounds right.
s
Amazing. Thanks a lot.
m
@Suraj Mittal So this seems like not the usual way of including dependencies, why is a workaround needed in this case? I thought a flow of flows would wait on dependencies. I must be missing something, can you explain the use-case as different than a typical flow?
k
Flow of flows will wait for dependencies but
SKIP
for a flow is treated as
SUCCESS
at the moment, which triggers downstream flows. Passing state and results between subflows is something we’re working to improve.