Alastair
03/07/2025, 9:39 AMdeploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
# triggers=[
# TODO: add a trigger based on flow with name CONSTANTS.DEPLOYMENT_NAMES.flow1 completing
# ]
)
deployables=(deploy1,deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)Marvin
03/07/2025, 9:40 AMflow2 upon the successful completion of flow1, you can utilize Prefect's state hooks to achieve this flow dependency. Here's a concise example of how you might implement this:
1. Define a function that triggers flow2 when flow1 completes successfully.
2. Use the on_completion hook to attach this function to flow1.
Here's how you might modify your code:
python
from prefect import flow
from prefect.deployments import run_deployment
def trigger_flow2_on_completion(flow, flow_run, state):
if state.is_successful():
run_deployment(CONSTANTS.DEPLOYMENT_NAMES.flow2)
@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
# your flow1 logic here
# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
# Deploy flow2 normally
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
)
deployables = (deploy1, deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)
Key Points:
- The trigger_flow2_on_completion function checks if the state of flow1 is successful before triggering flow2.
- The on_completion hook is attached directly to flow1, which allows you to programmatically trigger flow2 without modifying `flow2`'s deployment.
For further details, you can explore these resources:
- Prefect Documentation on Flow Dependencies
- GitHub Example of Flow Chaining
This approach fits your preference for an imperative programming style and should integrate well with your existing deployment strategy.Alastair
03/07/2025, 9:41 AMMarvin
03/07/2025, 9:42 AMflow1. You can use a prefect.flow-run.Completed event to detect when flow1 finishes successfully.
3. Configure Actions: Set the action to start flow2 when the trigger condition is met. This can be done using the RunDeployment action.
For more detailed steps, you can refer to the following resources:
- Automations and Triggers Documentation
- Custom Triggers Documentation
This approach allows you to manage dependencies and automate flow executions without modifying the flow definitions themselves. It's an excellent way to keep your flow logic clean and maintainable while leveraging Prefect's orchestration capabilities.Alastair
03/07/2025, 12:24 PMMarvin
03/07/2025, 12:25 PMflow1 as a parameter to flow2, you can leverage Prefect's ability to run deployments with specific parameters. Here's a concise guide on how to achieve this:
1. Capture the Outcome of flow1: Ensure that flow1 returns a result that you want to pass to flow2.
2. Trigger flow2 with Parameters: Use the run_deployment function to trigger flow2, passing the result of flow1 as a parameter.
Here's an example modification to your setup:
python
from prefect import flow
from prefect.deployments import run_deployment
def trigger_flow2_on_completion(flow, flow_run, state):
if state.is_successful():
# Extract the result from flow1's state
result = state.result()
# Pass the result as a parameter to flow2
run_deployment(
CONSTANTS.DEPLOYMENT_NAMES.flow2,
parameters={"result_from_flow1": result}
)
@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
# your flow1 logic here
# Return the result you want to pass to flow2
return "some_result"
@flow
def flow2(result_from_flow1):
print(f"Received result from flow1: {result_from_flow1}")
# your flow2 logic here
# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
# Deploy flow2 normally
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
)
deployables = (deploy1, deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)
Key Points:
- Use state.result() to capture the output from flow1.
- Pass this output as a parameter to flow2 using the run_deployment function.
For additional information, you can refer to the Prefect documentation on writing flows and flow chaining examples.Alastair
03/07/2025, 2:00 PMMarvin
03/07/2025, 2:01 PMstate.is_successful() does not appear to be a standard method in Prefect's API for checking if a flow or task run was successful. Instead, you should check if the state of the flow is COMPLETED, which indicates that the run did not encounter any errors and returned successfully.
Here's how you can modify the check:
python
def trigger_flow2_on_completion(flow, flow_run, state):
if state.name == "COMPLETED":
# Extract the result from flow1's state
result = state.result()
# Pass the result as a parameter to flow2
run_deployment(
CONSTANTS.DEPLOYMENT_NAMES.flow2,
parameters={"result_from_flow1": result}
)
In this example, state.name == "COMPLETED" is used to verify that the flow run finished successfully. For more detailed information, you can refer to Prefect's documentation on managing states.