Alex Ilyin
07/30/2025, 6:20 PMMarvin
07/30/2025, 6:20 PMMarvin
07/30/2025, 6:21 PMreturn_state=True
: This is the primary way to prevent task failures from affecting downstream tasks. Here's an example:
python
from prefect import flow, task
@task(return_state=True)
def task_1():
raise ValueError("This will fail")
@task
def task_2():
print("I will still run!")
@flow
def my_flow():
state_1 = task_1() # This task will fail but won't stop the flow
task_2() # This will still execute
my_flow()
2. Using allow_failure=True
: When calling a task, you can explicitly allow it to fail without affecting downstream tasks:
python
from prefect import flow, task
@task
def task_1():
raise ValueError("This will fail")
@task
def task_2():
print("I will still run!")
@flow
def my_flow():
# The failed state will be passed downstream but won't prevent execution
failed_result = task_1(allow_failure=True)
task_2()
my_flow()
3. Parallel Execution: If you want tasks to run completely independently of each other (in parallel), you can use `submit()`:
python
from prefect import flow, task
@task
def task_1():
print("Running task 1")
@task
def task_2():
print("Running task 2")
@flow
def my_flow():
# Both tasks will start executing immediately
future_1 = task_1.submit()
future_2 = task_2.submit()
# If you need the results later:
result_1 = future_1.result()
result_2 = future_2.result()
my_flow()
A few important notes:
- With return_state=True
, the task will return a Prefect State object instead of the actual return value
- With allow_failure=True
, failed tasks won't raise exceptions in the flow, but you'll get a Failed state object
- Using submit()
is great for parallel execution, but be careful with resource usage if submitting many tasks at once
These patterns give you flexibility in handling task dependencies and failures. Choose the approach that best fits your specific use case:
- Use return_state=True
if you want to handle the state explicitly in your flow
- Use allow_failure=True
for the simplest way to continue execution despite failures
- Use submit()
when you want true parallel execution without dependencies
Remember that while these patterns allow tasks to run independently, you should still consider whether this is the best approach for your workflow. Sometimes, task dependencies are important for data consistency or resource management.Alex Ilyin
07/30/2025, 6:49 PMMarvin
07/30/2025, 6:49 PMMarvin
07/30/2025, 6:50 PMsubmit()
for Everything: This is the most straightforward way to achieve complete independence:
python
from prefect import flow, task
@task
def task_1(i):
print(f"Running task 1 instance {i}")
# This could even fail and task_2 would still run
@task
def task_2():
print("Running task 2")
@flow
def my_flow():
# Submit multiple task_1 instances
futures_1 = [task_1.submit(i) for i in range(3)]
# task_2 will start immediately, not waiting for task_1 instances
future_2 = task_2.submit()
# If you need the results later, you can collect them:
# results_1 = [f.result() for f in futures_1]
# result_2 = future_2.result()
my_flow()
2. Using Task Groups: If you want more control over groups of tasks:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def task_1(i):
print(f"Running task 1 instance {i}")
@task
def task_2():
print("Running task 2 - I run independently!")
@flow
def my_flow():
# Start a group of task_1 instances
group_1_futures = []
for i in range(3):
# Each task_1 is submitted independently
future = task_1.submit(i)
group_1_futures.append(future)
# task_2 starts immediately, doesn't wait for group_1
task_2_future = task_2.submit()
# Optionally, if you need results later:
# for f in group_1_futures:
# try:
# result = f.result()
# except Exception as e:
# print(f"A task_1 failed: {e}")
my_flow()
Key points about these solutions:
1. The key is using submit()
for ALL tasks that should run independently. When you use regular task calls (like task_1(i)
), it creates an implicit dependency.
2. Benefits of this approach:
- Tasks run truly in parallel
- Failures in one task_1 instance won't affect other task_1 instances or task_2
- You can still collect results later if needed
3. Things to watch out for:
- Resource usage: All tasks will run at once, so make sure your system can handle the concurrent load
- Memory usage: If you're creating many futures, they'll all be held in memory
- Error handling: You'll need to handle errors explicitly if you want to know about failures
If you need to limit concurrency (but still maintain independence), you can use task run configurations:
python
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10)
def task_1(i):
print(f"Running task 1 instance {i}")
@task
def task_2():
print("Running task 2")
@flow(task_runner_max_concurrency=2) # Limit concurrent tasks to 2
def my_flow():
futures_1 = [task_1.submit(i) for i in range(5)] # Will run 2 at a time
task_2.submit() # Still starts immediately
my_flow()
This gives you the best of both worlds - independent execution but with controlled resource usage. The tasks will still be independent (task_2 won't wait for task_1 instances), but you won't overwhelm your system with too many concurrent executions.
Choose the approach that best balances your needs for independence, resource usage, and error handling. The first approach (simple submit) is usually the best place to start unless you have specific requirements around task grouping or concurrency control.Alex Ilyin
07/30/2025, 6:59 PMMarvin
07/30/2025, 7:00 PMMarvin
07/30/2025, 7:01 PMwait_for
works with tasks in Prefect 2.x. Here's a comprehensive explanation with examples:
The wait_for
argument is used with task.submit()
to create explicit dependencies between tasks, especially when using parallel execution. Here's how to use it:
python
from prefect import flow, task
@task
def task_1():
print("Task 1 executing")
return "Result from task 1"
@task
def task_2():
print("Task 2 executing")
return "Result from task 2"
@task
def task_3(input_data):
print(f"Task 3 executing with input: {input_data}")
@flow
def my_flow():
# Submit tasks 1 and 2 to run in parallel
future_1 = task_1.submit()
future_2 = task_2.submit()
# Task 3 will wait for both task_1 and task_2 to complete
# before starting
task_3.submit(
"some input",
wait_for=[future_1, future_2]
)
my_flow()
Key points about `wait_for`:
1. Accepts Futures: The wait_for
argument takes a list of futures (the results of task.submit()
calls)
2. Multiple Dependencies: You can wait for multiple tasks:
python
@flow
def my_flow():
futures = [task_1.submit() for _ in range(3)] # Submit 3 instances of task_1
# Wait for all task_1 instances to complete
task_2.submit(wait_for=futures)
3. Selective Dependencies: You can choose which tasks to wait for:
python
@flow
def my_flow():
future_1 = task_1.submit()
future_2 = task_2.submit()
future_3 = task_3.submit()
# task_4 only waits for task_1 and task_2, not task_3
task_4.submit(wait_for=[future_1, future_2])
4. Error Handling: If any task in the wait_for
list fails, the dependent task won't start:
python
@flow
def my_flow():
future_1 = task_1.submit()
# task_2 won't start if task_1 fails
future_2 = task_2.submit(wait_for=[future_1])
# You can handle failures explicitly
try:
result = future_2.result()
except Exception as e:
print(f"Task 2 didn't run because: {e}")
5. Combining with allow_failure: You can allow upstream tasks to fail and still proceed:
python
@flow
def my_flow():
future_1 = task_1.submit()
# task_2 will run even if task_1 fails
future_2 = task_2.submit(
wait_for=[future_1],
allow_failure=True
)
Common use cases for `wait_for`:
- Ensuring data dependencies are met before processing
- Coordinating resource usage between tasks
- Creating complex dependency graphs while maintaining parallel execution where possible
- Implementing fan-in patterns where multiple upstream tasks feed into a single downstream task
Remember:
- wait_for
only works with futures returned from task.submit()
- It's different from regular sequential task calls, which create implicit dependencies
- It's particularly useful when you want fine-grained control over task dependencies in parallel execution scenarios
This mechanism gives you explicit control over task dependencies while still allowing for parallel execution where possible. It's especially useful when you have complex dependency patterns that can't be expressed through simple sequential task calls.