Alastair
03/07/2025, 9:39 AMdeploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
# triggers=[
# TODO: add a trigger based on flow with name CONSTANTS.DEPLOYMENT_NAMES.flow1 completing
# ]
)
deployables=(deploy1,deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)
Marvin
03/07/2025, 9:40 AMflow2
upon the successful completion of flow1
, you can utilize Prefect's state hooks to achieve this flow dependency. Here's a concise example of how you might implement this:
1. Define a function that triggers flow2
when flow1
completes successfully.
2. Use the on_completion
hook to attach this function to flow1
.
Here's how you might modify your code:
python
from prefect import flow
from prefect.deployments import run_deployment
def trigger_flow2_on_completion(flow, flow_run, state):
if state.is_successful():
run_deployment(CONSTANTS.DEPLOYMENT_NAMES.flow2)
@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
# your flow1 logic here
# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
# Deploy flow2 normally
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
)
deployables = (deploy1, deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)
Key Points:
- The trigger_flow2_on_completion
function checks if the state of flow1
is successful before triggering flow2
.
- The on_completion
hook is attached directly to flow1
, which allows you to programmatically trigger flow2
without modifying `flow2`'s deployment.
For further details, you can explore these resources:
- Prefect Documentation on Flow Dependencies
- GitHub Example of Flow Chaining
This approach fits your preference for an imperative programming style and should integrate well with your existing deployment strategy.Alastair
03/07/2025, 9:41 AMMarvin
03/07/2025, 9:42 AMflow1
. You can use a prefect.flow-run.Completed
event to detect when flow1
finishes successfully.
3. Configure Actions: Set the action to start flow2
when the trigger condition is met. This can be done using the RunDeployment
action.
For more detailed steps, you can refer to the following resources:
- Automations and Triggers Documentation
- Custom Triggers Documentation
This approach allows you to manage dependencies and automate flow executions without modifying the flow definitions themselves. It's an excellent way to keep your flow logic clean and maintainable while leveraging Prefect's orchestration capabilities.Alastair
03/07/2025, 12:24 PMMarvin
03/07/2025, 12:25 PMflow1
as a parameter to flow2
, you can leverage Prefect's ability to run deployments with specific parameters. Here's a concise guide on how to achieve this:
1. Capture the Outcome of flow1
: Ensure that flow1
returns a result that you want to pass to flow2
.
2. Trigger flow2
with Parameters: Use the run_deployment
function to trigger flow2
, passing the result of flow1
as a parameter.
Here's an example modification to your setup:
python
from prefect import flow
from prefect.deployments import run_deployment
def trigger_flow2_on_completion(flow, flow_run, state):
if state.is_successful():
# Extract the result from flow1's state
result = state.result()
# Pass the result as a parameter to flow2
run_deployment(
CONSTANTS.DEPLOYMENT_NAMES.flow2,
parameters={"result_from_flow1": result}
)
@flow(on_completion=[trigger_flow2_on_completion])
def flow1():
# your flow1 logic here
# Return the result you want to pass to flow2
return "some_result"
@flow
def flow2(result_from_flow1):
print(f"Received result from flow1: {result_from_flow1}")
# your flow2 logic here
# Deploy flow1 with the trigger
deploy1 = flow1.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow1,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.DEFAULT,
tags=[CONSTANTS.PREFECT_TAG],
)
# Deploy flow2 normally
deploy2 = flow2.to_deployment(
name=CONSTANTS.DEPLOYMENT_NAMES.flow2,
job_variables=shared_job_variables,
work_queue_name=CONSTANTS.QUEUES.LOW,
tags=[CONSTANTS.PREFECT_TAG],
)
deployables = (deploy1, deploy2)
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
build=False,
push=False,
image=f"{docker_image}:{docker_tag}",
)
Key Points:
- Use state.result()
to capture the output from flow1
.
- Pass this output as a parameter to flow2
using the run_deployment
function.
For additional information, you can refer to the Prefect documentation on writing flows and flow chaining examples.Alastair
03/07/2025, 2:00 PMMarvin
03/07/2025, 2:01 PMstate.is_successful()
does not appear to be a standard method in Prefect's API for checking if a flow or task run was successful. Instead, you should check if the state of the flow is COMPLETED
, which indicates that the run did not encounter any errors and returned successfully.
Here's how you can modify the check:
python
def trigger_flow2_on_completion(flow, flow_run, state):
if state.name == "COMPLETED":
# Extract the result from flow1's state
result = state.result()
# Pass the result as a parameter to flow2
run_deployment(
CONSTANTS.DEPLOYMENT_NAMES.flow2,
parameters={"result_from_flow1": result}
)
In this example, state.name == "COMPLETED"
is used to verify that the flow run finished successfully. For more detailed information, you can refer to Prefect's documentation on managing states.