<@ULVA73B9P> can you help me finish this example? ...
# marvin-ai
a
@Marvin can you help me finish this example? I want to add a trigger for flow2 so that it starts when flow1 completes sucessfully. I am deploying the flows programatically in python like this:
Copy code
deploy1 = flow1.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.DEFAULT,
    tags=[CONSTANTS.PREFECT_TAG],
)

deploy2 = flow2.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.LOW,
    tags=[CONSTANTS.PREFECT_TAG],
    # triggers=[
    # TODO: add a trigger based on flow with name CONSTANTS.DEPLOYMENT_NAMES.flow1 completing
    # ]
)

deployables=(deploy1,deploy2)

deploy_ids = deploy(
    *deployables,
    work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
    build=False,
    push=False,
    image=f"{docker_image}:{docker_tag}",
)
m
To trigger
flow2
upon the successful completion of
flow1
, you can utilize Prefect's state hooks to achieve this flow dependency. Here's a concise example of how you might implement this: 1. Define a function that triggers
flow2
when
flow1
completes successfully. 2. Use the
on_completion
hook to attach this function to
flow1
. Here's how you might modify your code:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment

def trigger_flow2_on_completion(flow, flow_run, state):
    if state.is_successful():
        run_deployment(CONSTANTS.DEPLOYMENT_NAMES.flow2)

@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
    # your flow1 logic here

# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.DEFAULT,
    tags=[CONSTANTS.PREFECT_TAG],
)

# Deploy flow2 normally
deploy2 = flow2.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.LOW,
    tags=[CONSTANTS.PREFECT_TAG],
)

deployables = (deploy1, deploy2)

deploy_ids = deploy(
    *deployables,
    work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
    build=False,
    push=False,
    image=f"{docker_image}:{docker_tag}",
)
Key Points: - The
trigger_flow2_on_completion
function checks if the state of
flow1
is successful before triggering
flow2
. - The
on_completion
hook is attached directly to
flow1
, which allows you to programmatically trigger
flow2
without modifying `flow2`'s deployment. For further details, you can explore these resources: - Prefect Documentation on Flow Dependencies - GitHub Example of Flow Chaining This approach fits your preference for an imperative programming style and should integrate well with your existing deployment strategy.
a
@Marvin Is it possible to achieve the same thing using prefect automations and triggers and actions?
m
Yes, you can achieve the same behavior using Prefect's automations, triggers, and actions. Prefect automations provide a flexible framework to automatically take actions based on specific events, such as the completion of a flow run. Here's how you can set it up: 1. Create an Automation: Navigate to the Automations page in Prefect Cloud. 2. Define a Trigger: Set up a trigger condition that listens for the completion of
flow1
. You can use a
prefect.flow-run.Completed
event to detect when
flow1
finishes successfully. 3. Configure Actions: Set the action to start
flow2
when the trigger condition is met. This can be done using the
RunDeployment
action. For more detailed steps, you can refer to the following resources: - Automations and Triggers Documentation - Custom Triggers Documentation This approach allows you to manage dependencies and automate flow executions without modifying the flow definitions themselves. It's an excellent way to keep your flow logic clean and maintainable while leveraging Prefect's orchestration capabilities.
a
@Marvin in the above example can you show me how to pass the outcome of flow1 as a parameter to flow2
m
To pass the outcome of
flow1
as a parameter to
flow2
, you can leverage Prefect's ability to run deployments with specific parameters. Here's a concise guide on how to achieve this: 1. Capture the Outcome of
flow1
: Ensure that
flow1
returns a result that you want to pass to
flow2
. 2. Trigger
flow2
with Parameters
: Use the
run_deployment
function to trigger
flow2
, passing the result of
flow1
as a parameter. Here's an example modification to your setup:
Copy code
python
from prefect import flow
from prefect.deployments import run_deployment

def trigger_flow2_on_completion(flow, flow_run, state):
    if state.is_successful():
        # Extract the result from flow1's state
        result = state.result()
        # Pass the result as a parameter to flow2
        run_deployment(
            CONSTANTS.DEPLOYMENT_NAMES.flow2,
            parameters={"result_from_flow1": result}
        )

@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
    # your flow1 logic here
    # Return the result you want to pass to flow2
    return "some_result"

@flow
def flow2(result_from_flow1):
    print(f"Received result from flow1: {result_from_flow1}")
    # your flow2 logic here

# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.DEFAULT,
    tags=[CONSTANTS.PREFECT_TAG],
)

# Deploy flow2 normally
deploy2 = flow2.to_deployment(
    name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
    job_variables=shared_job_variables,
    work_queue_name=CONSTANTS.QUEUES.LOW,
    tags=[CONSTANTS.PREFECT_TAG],
)

deployables = (deploy1, deploy2)

deploy_ids = deploy(
    *deployables,
    work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
    build=False,
    push=False,
    image=f"{docker_image}:{docker_tag}",
)
Key Points: - Use
state.result()
to capture the output from
flow1
. - Pass this output as a parameter to
flow2
using the
run_deployment
function. For additional information, you can refer to the Prefect documentation on writing flows and flow chaining examples.
a
@Marvin is state.is_successful() a real method or a bug?
m
The method
state.is_successful()
does not appear to be a standard method in Prefect's API for checking if a flow or task run was successful. Instead, you should check if the state of the flow is
COMPLETED
, which indicates that the run did not encounter any errors and returned successfully. Here's how you can modify the check:
Copy code
python
def trigger_flow2_on_completion(flow, flow_run, state):
    if state.name == "COMPLETED":
        # Extract the result from flow1's state
        result = state.result()
        # Pass the result as a parameter to flow2
        run_deployment(
            CONSTANTS.DEPLOYMENT_NAMES.flow2,
            parameters={"result_from_flow1": result}
        )
In this example,
state.name == "COMPLETED"
is used to verify that the flow run finished successfully. For more detailed information, you can refer to Prefect's documentation on managing states.