Zivile
07/04/2025, 6:38 AMMarvin
07/04/2025, 6:38 AMMarvin
07/04/2025, 6:39 AMpython
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def task_1():
return "Result from task 1"
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def task_2():
return "Result from task 2"
@flow
def flow_1():
return task_1()
@flow
def flow_2():
return task_2()
@flow
def dependent_flow():
# Get the futures from the other flows
future_1 = flow_1()
future_2 = flow_2()
# Wait for both tasks to complete before proceeding
from prefect.tasks import wait_for_tasks
wait_for_tasks([future_1, future_2])
print("Both prerequisite tasks completed!")
# Your flow logic here...
if __name__ == "__main__":
dependent_flow()
Key points about this solution:
1. Each prerequisite task is in its own flow (flow_1
and flow_2
)
2. The dependent flow calls both prerequisite flows and gets their futures
3. wait_for_tasks()
is used to create a synchronization point
4. The dependent flow will only proceed once both prerequisite tasks have completed successfully
This approach has several advantages:
- It maintains flow independence while allowing coordination
- It works whether the flows are running locally or distributed
- If either prerequisite task fails, the dependent flow will not proceed
- You can still track each flow independently in the Prefect UI
Remember that this creates a hard dependency - if either of the prerequisite tasks fails, your dependent flow will not proceed. Make sure this is the behavior you want, and consider implementing appropriate error handling if needed.
You can find more details about task dependencies in the Prefect documentation.Zivile
07/04/2025, 6:41 AMMarvin
07/04/2025, 6:42 AMMarvin
07/04/2025, 6:42 AMpython
from prefect import flow, task
@task
def task_1():
return "Result from task 1"
@task
def task_2():
return "Result from task 2"
@flow
def flow_1():
return task_1()
@flow
def flow_2():
return task_2()
@flow
def parent_flow():
# This will run sequentially by default
result_1 = flow_1()
result_2 = flow_2()
# Your dependent logic here
print("Both prerequisite flows completed!")
# ... rest of your flow
if __name__ == "__main__":
parent_flow()
2. Using Flow Run Wait
python
from prefect import flow, task
from prefect.client import get_client
from prefect.deployments import run_deployment
import asyncio
@task
def wait_for_flow_runs(flow_run_ids):
async def wait_for_runs():
async with get_client() as client:
for run_id in flow_run_ids:
await client.wait_for_flow_run(run_id, timeout=None)
asyncio.run(wait_for_runs())
@flow
def dependent_flow():
# Trigger the prerequisite flows
flow_1_run = run_deployment(
name="flow-1/deployment-name",
timeout=0, # Don't wait for completion
)
flow_2_run = run_deployment(
name="flow-2/deployment-name",
timeout=0,
)
# Wait for both flows to complete
wait_for_flow_runs([flow_1_run.id, flow_2_run.id])
print("Both prerequisite flows completed!")
# Your flow logic here...
Key differences from Prefect 3.x:
- There's no direct wait_for_tasks()
function for cross-flow dependencies
- Dependencies between tasks must be within the same flow
- You need to use either subflows or the Prefect API to coordinate between flows
Recommendations:
1. If possible, use the first approach (parent flow) as it's simpler and more reliable
2. If you must keep the flows separate (e.g., they're deployed independently), use the second approach with flow run waiting
3. Consider upgrading to Prefect 3.x if cross-flow task dependencies are important for your use case, as it has better support for this pattern
You can find more details about task dependencies in Prefect 2.x in the documentation.Zivile
07/04/2025, 6:52 AMMarvin
07/04/2025, 6:52 AMMarvin
07/04/2025, 6:53 AMpython
# module_1/flow_1.py
from prefect import flow, task
@task
def task_1():
return "Result from task 1"
@flow(name="flow_1")
def flow_1():
return task_1()
python
# module_2/flow_2.py
from prefect import flow, task
@task
def task_2():
return "Result from task 2"
@flow(name="flow_2")
def flow_2():
return task_2()
python
# module_3/dependent_flow.py
from prefect import flow, task
from prefect.client import get_client
from prefect.deployments import run_deployment
import asyncio
@task
def wait_for_flow_runs(flow_run_ids):
async def wait_for_runs():
async with get_client() as client:
for run_id in flow_run_ids:
# Wait and get the final state
state = await client.wait_for_flow_run(run_id, timeout=None)
if state.is_failed():
raise Exception(f"Flow run {run_id} failed")
asyncio.run(wait_for_runs())
@flow(name="dependent_flow")
def dependent_flow():
# Assuming flows are deployed with these names
flow_1_run = run_deployment(
name="flow-1/your-deployment-name-1",
timeout=0, # Don't wait for completion here
)
flow_2_run = run_deployment(
name="flow-2/your-deployment-name-2",
timeout=0,
)
# Wait for both flows to complete
wait_for_flow_runs([flow_1_run.id, flow_2_run.id])
print("Both prerequisite flows completed!")
# Your flow logic here...
if __name__ == "__main__":
dependent_flow()
To make this work, you need to:
1. Deploy the Flows First
bash
# Deploy flow_1
cd module_1
prefect deployment build flow_1.py:flow_1 -n "your-deployment-name-1" -q default
prefect deployment apply flow_1-deployment.yaml
# Deploy flow_2
cd ../module_2
prefect deployment build flow_2.py:flow_2 -n "your-deployment-name-2" -q default
prefect deployment apply flow_2-deployment.yaml
2. Start a Prefect Agent
bash
prefect agent start -q default
3. Run Your Dependent Flow
bash
cd ../module_3
python dependent_flow.py
Key points:
- Each flow can live in its own module and be deployed independently
- The dependent flow uses the Prefect API to track the other flows
- Error handling is included (the dependent flow will raise an exception if either prerequisite flow fails)
- You can monitor all flows in the Prefect UI
- The flows can be on different schedules or triggered independently
Alternative Approach using Imports:
If the flows are all running in the same Python environment and you don't need them to be fully independent, you could also import them directly:
python
# module_3/dependent_flow.py
from prefect import flow
from module_1.flow_1 import flow_1
from module_2.flow_2 import flow_2
@flow
def dependent_flow():
result_1 = flow_1()
result_2 = flow_2()
# Your flow logic here...
Choose the approach that best fits your needs:
- Use the deployment approach if the flows need to be independently deployable and schedulable
- Use the direct import approach if the flows are always run together and you want simpler code
Remember to adjust the deployment names and queue names according to your actual setup. You can find these in the Prefect UI or your deployment YAML files.Zivile
07/04/2025, 6:58 AMMarvin
07/04/2025, 6:58 AMMarvin
07/04/2025, 7:00 AMpython
# automations.py
from prefect.client import get_client
from prefect.deployments import run_deployment
from prefect.server.schemas.states import Completed
import asyncio
from datetime import datetime, timedelta
async def check_flow_runs(flow_name_1, flow_name_2, lookback_minutes=60):
"""Check if both flows have completed successfully within the lookback period"""
async with get_client() as client:
# Get recent flow runs for both flows
lookback_time = datetime.utcnow() - timedelta(minutes=lookback_minutes)
flows = await client.read_flows()
flow_1_runs = []
flow_2_runs = []
for flow in flows:
if flow.name == flow_name_1:
flow_1_runs = await client.read_flow_runs(
flow_filter=flow,
state={"type": "COMPLETED"},
start_time=lookback_time
)
elif flow.name == flow_name_2:
flow_2_runs = await client.read_flow_runs(
flow_filter=flow,
state={"type": "COMPLETED"},
start_time=lookback_time
)
return bool(flow_1_runs and flow_2_runs)
async def monitor_and_trigger():
"""Monitor flows and trigger the dependent flow when conditions are met"""
async with get_client() as client:
while True:
# Replace these with your actual flow names
flows_completed = await check_flow_runs(
"flow_1", # name of your first flow
"flow_2" # name of your second flow
)
if flows_completed:
# Trigger your dependent flow
await run_deployment(
name="dependent-flow/your-deployment-name",
timeout=0
)
# Wait before checking again to avoid multiple triggers
await asyncio.sleep(300) # 5 minutes
else:
await asyncio.sleep(30) # Check every 30 seconds
if __name__ == "__main__":
asyncio.run(monitor_and_trigger())
2. Your dependent flow in the third module remains simple:
python
# module_3/dependent_flow.py
from prefect import flow
@flow(name="dependent_flow")
def dependent_flow():
print("Running dependent flow after prerequisites completed!")
# Your flow logic here...
3. Deploy all your flows:
bash
# Deploy your dependent flow
cd module_3
prefect deployment build dependent_flow.py:dependent_flow -n "your-deployment-name" -q default
prefect deployment apply dependent_flow-deployment.yaml
4. Run the automation script:
bash
python automations.py
Alternative Approach using Prefect Automations UI:
You can also set this up through the Prefect UI:
1. Go to the Prefect UI
2. Navigate to Automations
3. Create a new automation with:
- Trigger: "Flow run state updates"
- Filters:
- Flow name: [your first flow name]
- State: Completed
- Add another trigger (AND condition):
- Flow name: [your second flow name]
- State: Completed
- Action: "Run deployment"
- Select your dependent flow deployment
Key Points:
- The automation script continuously monitors the completion of both flows
- It only triggers the dependent flow when both prerequisite flows have completed successfully
- The lookback period (default 60 minutes) ensures you're only considering recent flow runs
- The sleep timer (5 minutes) prevents multiple triggers of the dependent flow
- Error handling and logging can be added as needed
Best Practices:
1. Add proper error handling and logging to the automation script
2. Consider using environment variables for flow and deployment namesMarvin
07/04/2025, 7:00 AMZivile
07/04/2025, 7:02 AMMarvin
07/04/2025, 7:02 AMMarvin
07/04/2025, 7:03 AMpython
# automations.py
from prefect.deployments import DeploymentTrigger
from prefect.server.schemas.triggers import DeploymentTriggerTypes, ComparisonOp
from prefect import flow
def create_automation():
# Create triggers for both prerequisite flows
trigger_1 = DeploymentTrigger(
name="flow-1/deployment-1", # Replace with your first flow's deployment name
match_related=True,
expect_completion=True,
)
trigger_2 = DeploymentTrigger(
name="flow-2/deployment-2", # Replace with your second flow's deployment name
match_related=True,
expect_completion=True,
)
# Create a trigger for your dependent flow that waits for both flows
dependent_trigger = DeploymentTrigger(
name="dependent-flow/your-deployment", # Replace with your dependent flow's deployment name
triggers=[trigger_1, trigger_2],
trigger_type=DeploymentTriggerTypes.ALL_SUCCESSFUL,
within="24h" # Adjust this time window as needed
)
return dependent_trigger
if __name__ == "__main__":
trigger = create_automation()
trigger.apply()
And your flows in their respective modules would look like this:
python
# module_1/flow_1.py
from prefect import flow
@flow(name="flow_1")
def flow_1():
# Your flow logic here
pass
python
# module_2/flow_2.py
from prefect import flow
@flow(name="flow_2")
def flow_2():
# Your flow logic here
pass
python
# module_3/dependent_flow.py
from prefect import flow
@flow(name="dependent_flow")
def dependent_flow():
print("Running dependent flow after prerequisites completed!")
# Your flow logic here
pass
To set this up:
1. First, deploy all your flows:
bash
# From module_1 directory
prefect deployment build flow_1.py:flow_1 -n "deployment-1" -q default
prefect deployment apply flow_1-deployment.yaml
# From module_2 directory
prefect deployment build flow_2.py:flow_2 -n "deployment-2" -q default
prefect deployment apply flow_2-deployment.yaml
# From module_3 directory
prefect deployment build dependent_flow.py:dependent_flow -n "your-deployment" -q default
prefect deployment apply dependent_flow-deployment.yaml
2. Run the automation script:
bash
python automations.py
Alternatively, you can set this up through the Prefect UI:
1. Go to the Prefect UI
2. Navigate to Automations
3. Click "Create Automation"
4. Add two triggers:
- First trigger:
- Type: "Flow run state updates"
- Flow: [your first flow name]
- State: Completed
- Second trigger:
- Type: "Flow run state updates"
- Flow: [your second flow name]
- State: Completed
5. Set the trigger logic to "ALL triggers must match"
6. Add an action:
- Type: "Run deployment"
- Select your dependent flow deployment
Key Benefits of this approach in 2.19:
- Built-in support for complex trigger conditions
- More reliable than custom monitoring scripts
- Handles edge cases automatically
- Can be managed through the UI
- Supports time windows for trigger conditions
- No need for custom monitoring scripts
Best Practices:
1. Set appropriate time windows for your triggers (the within
parameter)
2. Use meaningful names for your deployments
3. Consider adding tags to your flows for better organization
4. Monitor the automation status in the UI
Remember that:
- All flows must be deployed before setting up the automation
- The automation will only trigger for future flow runs
- You can modify or disable the automation through the UI if needed
- The time window (within
parameter) should be set based on your expected flow run frequencies
This approach is much more robust than theZivile
07/04/2025, 7:04 AMMarvin
07/04/2025, 7:04 AMMarvin
07/04/2025, 7:05 AMasyncio
and polling) would be more prone to issues like missed events, requiring more error handling, and needing additional infrastructure to keep the monitoring script running continuously. The native DeploymentTrigger approach eliminates these concerns by handling everything within Prefect's infrastructure.Zivile
07/04/2025, 7:05 AMMarvin
07/04/2025, 7:05 AMMarvin
07/04/2025, 7:08 AMpython
# automations.py
from prefect.events import Event
from prefect.events.triggers import DeploymentEventTrigger, DeploymentCompoundTrigger
from prefect.client.schemas.objects import DeploymentTriggerTypes
def create_automation():
# Create triggers for both prerequisite flows
trigger_1 = DeploymentEventTrigger(
name="flow-1/deployment-1", # Replace with your first flow's deployment name
type="completed",
)
trigger_2 = DeploymentEventTrigger(
name="flow-2/deployment-2", # Replace with your second flow's deployment name
type="completed",
)
# Create compound trigger that waits for both flows
compound_trigger = DeploymentCompoundTrigger(
triggers=[trigger_1, trigger_2],
trigger_type=DeploymentTriggerTypes.ALL_SUCCESSFUL,
deployment_name="dependent-flow/your-deployment", # Replace with your dependent flow's deployment name
within_time="24h" # Adjust this time window as needed
)
return compound_trigger
if __name__ == "__main__":
trigger = create_automation()
trigger.save("my-automation") # Saves the automation with a name
Your flows would be defined normally in their respective modules:
python
# module_1/flow_1.py
from prefect import flow
@flow(name="flow_1")
def flow_1():
# Your flow logic here
pass
python
# module_2/flow_2.py
from prefect import flow
@flow(name="flow_2")
def flow_2():
# Your flow logic here
pass
python
# module_3/dependent_flow.py
from prefect import flow
@flow(name="dependent_flow")
def dependent_flow():
print("Running dependent flow after prerequisites completed!")
# Your flow logic here
pass
To set this up:
1. First, deploy all your flows:
bash
# From module_1 directory
prefect deploy flow_1.py:flow_1 -n "deployment-1"
# From module_2 directory
prefect deploy flow_2.py:flow_2 -n "deployment-2"
# From module_3 directory
prefect deploy dependent_flow.py:dependent_flow -n "your-deployment"
2. Run the automation script:
bash
python automations.py
Alternatively, you can set this up through the Prefect UI:
1. Go to the Prefect UI
2. Navigate to Automations
3. Click "Create Automation"
4. Configure two event triggers:
- First trigger:
- Type: "Deployment Event"
- Deployment: [your first flow deployment]
- Event: Completed
- Second trigger:
- Type: "Deployment Event"
- Deployment: [your second flow deployment]
- Event: Completed
5. Set the trigger logic to "ALL triggers must match"
6. Add an action to run your dependent flow deployment
Key Differences in Prefect 3.x:
- The import paths and class names are different
- The deployment creation syntax is simpler (prefect deploy
instead of prefect deployment build
)
- The event system is more streamlined
- The automation API is more consistent with the rest of Prefect 3.x
Best Practices for Prefect 3.x:
1. Use descriptive names for your automations
2. Set appropriate time windows based on your workflow needs
3. Consider adding tags to your flows for better organization
4. Monitor automations through the UI
5. Use version control for your automation definitions
6. Test your triggers with small test flows first
Remember:
- All flows must be deployed before setting up the automation
- The automation will only trigger for future flow runs
- You can manage automations through both the UI and code
- The time window should match your expected flow run patterns
- Automations can be enabled/disabled through the UI as needed
This approach maintains all the benefits of the PrefectZivile
07/04/2025, 7:46 AMMarvin
07/04/2025, 7:46 AMMarvin
07/04/2025, 7:48 AMDeploymentEventTrigger
class I mentioned doesn't exist in Prefect 3.x. Instead, Prefect 3.x uses a different event system. Here's the correct way to set up flow dependencies in Prefect 3.x:
1. Using the Prefect UI (Recommended approach):
- Go to the Prefect UI
- Navigate to Automations
- Create a new automation
- Select "Event" as the trigger type
- Use prefect.flow-run.Completed
as the event name
- Add filters for specific flow names
- Set the action to run your dependent flow deployment
2. Using Python code:
python
from prefect import flow
from prefect.client import get_client
from prefect.events import Event
import asyncio
@flow
def monitor_and_trigger():
async def wait_for_flows():
async with get_client() as client:
# Subscribe to flow run completion events
async for event in client.events.subscribe(
prefixes=["prefect.flow-run.Completed"],
event_names=["flow-1", "flow-2"] # Replace with your flow names
):
# Check if both flows have completed
if event.resource.name in ["flow-1", "flow-2"]:
# Trigger your dependent flow
await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id",
name="Triggered flow run"
)
asyncio.run(wait_for_flows())
However, for your specific use case of wanting to trigger a flow after two others complete, I would recommend a simpler approach using Prefect 3.x's native flow capabilities:
python
# dependent_flow.py
from prefect import flow
from prefect.client import get_client
import asyncio
@flow
def dependent_flow():
async def wait_for_flow_runs(flow_names):
async with get_client() as client:
flows = await client.read_flows()
target_flows = [f for f in flows if f.name in flow_names]
for flow in target_flows:
latest_run = await client.read_flow_runs(
flow_filter=flow,
sort="-start_time",
limit=1
)
if latest_run:
await client.wait_for_flow_run(latest_run[0].id)
# Wait for both prerequisite flows
asyncio.run(wait_for_flow_runs(["flow-1", "flow-2"]))
print("Both prerequisite flows completed!")
# Your flow logic here...
if __name__ == "__main__":
dependent_flow()
Key points about Prefect 3.x event handling:
1. It uses a more streamlined event system compared to 2.x
2. Events follow the pattern prefect.flow-run.{State}
(e.g., prefect.flow-run.Completed
)
3. You can subscribe to events using the client API
4. The UI provides a more user-friendly way to set up these automations
Best Practices:
1. Use the UI for setting up simple flow dependencies
2. Use code-based solutions when you need more complex logic
3. Consider using tags to better organize and filter your flows
4. Add appropriate error handling and logging
5. Set reasonable timeouts for flow run waiting
Remember that Prefect 3.x is designed to be more streamlined and intuitive than 2.x, so while some specific classes like DeploymentEventTrigger
don't exist, the functionality is still available through other means.Zivile
07/04/2025, 7:51 AMMarvin
07/04/2025, 7:51 AMMarvin
07/04/2025, 8:03 AMpython
# module_1/flow_1.py
from prefect import flow, task
from prefect.events import emit_event
@task(name="task_1_important")
def important_task_1():
result = "task 1 result"
# Emit an event when this specific task completes
emit_event(
event="task-1-completed",
resource={"name": "task_1_important"},
payload={"status": "completed"}
)
return result
@flow(name="flow_1")
def flow_1():
result = important_task_1()
# other tasks...
return result
python
# module_2/flow_2.py
from prefect import flow, task
from prefect.events import emit_event
@task(name="task_2_important")
def important_task_2():
result = "task 2 result"
# Emit an event when this specific task completes
emit_event(
event="task-2-completed",
resource={"name": "task_2_important"},
payload={"status": "completed"}
)
return result
@flow(name="flow_2")
def flow_2():
result = important_task_2()
# other tasks...
return result
2. Create your dependent flow that waits for both task events:
python
# module_3/dependent_flow.py
from prefect import flow
from prefect.client import get_client
import asyncio
from datetime import datetime, timedelta
@flow(name="dependent_flow")
def dependent_flow():
async def wait_for_task_events():
completed_tasks = set()
async with get_client() as client:
# Subscribe to our custom task completion events
async for event in client.events.subscribe(
event_names=["task-1-completed", "task-2-completed"]
):
task_name = event.resource.name
completed_tasks.add(task_name)
print(f"Task {task_name} completed")
# If both tasks are complete, we can proceed
if {"task_1_important", "task_2_important"}.issubset(completed_tasks):
print("Both required tasks have completed!")
return True
# Wait for both tasks to complete
asyncio.run(wait_for_task_events())
print("Starting dependent flow work...")
# Your flow logic here...
if __name__ == "__main__":
dependent_flow()
3. Deploy all your flows:
bash
# From module_1 directory
prefect deploy flow_1.py:flow_1 -n "deployment-1"
# From module_2 directory
prefect deploy flow_2.py:flow_2 -n "deployment-2"
# From module_3 directory
prefect deploy dependent_flow.py:dependent_flow -n "deployment-3"
Alternative Approach using Task Run States:
If you prefer not to use custom events, you can monitor task states directly:
```python
# module_3/dependent_flow.py
from prefect import flow
from prefect.client import get_client
import asyncio
@flow(name="dependent_flow")
def dependent_flow():
async def wait_for_tasks():
async with get_client() as client:
while True:
# Get task runs for both tasks
task_runs_1 = await client.read_task_runs(
task_name="task_1_important",
state_name="COMPLETED",
sort="-start_time",
limit=1
)
task_runs_2 = await client.read_task_runs(
task_name="task_2_important",
state_name="COMPLETED",
sort="-start_time",
limit=1
)
if task_runs_1 an