Solomon Negusse
07/17/2025, 10:01 PMutils.py
def common(a):
return a * 2
What’s the best practice out of these two options to use that:
a) Create a common task that’s then invoked from the flows like so:
prefect_common.py
from prefect import task
from utils import common
@task
def common_task(a):
return common(a)
flow_a.py
from prefect import task
from prefect_common import common_task
@task
def task_one():
pass
@flow
def flow():
prefect_common(3)
task_one()
flow_b.py
from prefect import task
from prefect_common import common_task
@task
def task_two():
pass
@flow
def flow():
prefect_common(5)
task_two()
b) create separate task in each flow that uses the common function
flow_a.py
from prefect import task
from util import common
@task
def task_one():
pass
@task
def common_task(a):
return common(a)
@flow
def flow():
common_task(3)
task_one()
flow_b.py
from prefect import task
from util import common
@task
def task_two():
pass
@task
def common_task(a):
return common(a)
@flow
def flow():
common_task(5)
task_two()
Marvin
07/17/2025, 10:01 PMMarvin
07/17/2025, 10:02 PMprefect_common.py
) means you only need to maintain and update the task logic in one location. If you need to modify the task's behavior, you only change it once.
2. Consistency: With a single task definition, you ensure that all flows are using exactly the same task implementation. This prevents accidental divergence that could occur if you had multiple copies of the same task.
3. Maintainability: When your codebase grows, having reusable components in separate modules makes the code more organized and easier to maintain. It follows the DRY (Don't Repeat Yourself) principle.
4. Testing: It's easier to write and maintain tests for a single task implementation rather than multiple copies across different flows.
Option B (defining the same task in each flow) has several disadvantages:
- Code duplication
- Higher maintenance burden
- Risk of inconsistency if one copy is updated but others aren't
- More difficult to test
Here's a slightly improved version of Option A that I would recommend:
python
# utils.py
def common(a):
return a * 2
# prefect_common.py
from prefect import task
from utils import common
@task(name="common_multiplication_task",
description="Multiplies input by 2",
retries=2) # Adding some task configuration as an example
def common_task(a: int) -> int:
return common(a)
# flow_a.py
from prefect import flow, task
from prefect_common import common_task
@task
def task_one():
pass
@flow
def flow_a():
result = common_task(3)
task_one()
# flow_b.py
from prefect import flow, task
from prefect_common import common_task
@task
def task_two():
pass
@flow
def flow_b():
result = common_task(5)
task_two()
The improvements in this version include:
- Added type hints for better code clarity
- Added task configuration (name, description, retries) which will be consistent across all uses
- Properly captured the task result in a variable (though not used in this example)
- Fixed the function call (your original had prefect_common(3)
instead of common_task(3)
)
This structure scales well as your project grows, and it's particularly useful if you need to add more complex functionality to the common task later, such as retries, caching, or logging.Solomon Negusse
07/17/2025, 10:04 PMMarvin
07/17/2025, 10:04 PMMarvin
07/17/2025, 10:05 PMMarvin
07/17/2025, 10:05 PMpython
# prefect_common.py
from prefect import task
from utils import common
def create_common_task(timeout_seconds=None, retries=None):
@task(
name="common_multiplication_task",
timeout_seconds=timeout_seconds,
retries=retries
)
def common_task(a):
return common(a)
return common_task
# flow_a.py - needs more retries and longer timeout
from prefect import flow, task
from prefect_common import create_common_task
@task
def task_one():
pass
@flow
def flow_a():
# Create task with specific settings for this flow
common_task = create_common_task(timeout_seconds=600, retries=5)
result = common_task(3)
task_one()
# flow_b.py - needs fewer retries and shorter timeout
from prefect import flow, task
from prefect_common import create_common_task
@task
def task_two():
pass
@flow
def flow_b():
# Create task with different settings for this flow
common_task = create_common_task(timeout_seconds=300, retries=2)
result = common_task(5)
task_two()
2. Using Separate Task Definitions: If the configurations are very different and you want to make it more explicit:
python
# prefect_common.py
from prefect import task
from utils import common
@task(
name="common_multiplication_task_standard",
timeout_seconds=300,
retries=2
)
def common_task_standard(a):
return common(a)
@task(
name="common_multiplication_task_extended",
timeout_seconds=600,
retries=5
)
def common_task_extended(a):
return common(a)
# flow_a.py - needs more robust settings
from prefect import flow, task
from prefect_common import common_task_extended
@flow
def flow_a():
result = common_task_extended(3)
# ...
# flow_b.py - needs standard settings
from prefect import flow, task
from prefect_common import common_task_standard
@flow
def flow_b():
result = common_task_standard(5)
# ...
Recommendation:
I recommend the first approach (Task Factories) because:
1. It's more flexible - you can create tasks with any configuration at runtime
2. It reduces code duplication - you're not maintaining multiple similar task definitions
3. It's more maintainable - if you need to change the underlying logic, you only change it in one place
4. It's more explicit in each flow what settings are being used
The second approach might be better if:
1. You have a fixed set of configurations that won't change
2. You want to make it very clear in the code what configurations exist
3. You want to prevent arbitrary configuration changes
Note that in both cases, the task configurations are set when the task is created, not when it's called. Prefect doesn't provide a way to override task settings at runtime after the task has been created. This is by design to ensure task execution behavior is predictable and consistent within a flow run.Marvin
07/17/2025, 10:06 PMpython
# prefect_common.py
from prefect import task
from typing import Optional
from utils import common
def create_common_task(
timeout_seconds: Optional[int] = None,
retries: Optional[int] = None,
name: Optional[str] = None
):
@task(
timeout_seconds=timeout_seconds,
retries=retries,
name=name or "common_multiplication_task"
)
def common_task(a: int) -> int:
return common(a)
return common_task
# flow_a.py
from prefect import flow, task
from prefect_common import create_common_task
@task
def task_one():
pass
@flow
def flow_a():
# Create task with specific configuration for this flow
common_task = create_common_task(timeout_seconds=300, retries=5)
result = common_task(3)
task_one()
# flow_b.py
from prefect import flow, task
from prefect_common import create_common_task
@task
def task_two():
pass
@flow
def flow_b():
# Different configuration for this flow
common_task = create_common_task(timeout_seconds=60, retries=2)
result = common_task(5)
task_two()
2. Create Multiple Task Variants
python
# prefect_common.py
from prefect import task
from utils import common
@task(timeout_seconds=300, retries=5, name="common_task_long")
def common_task_long_running(a: int) -> int:
return common(a)
@task(timeout_seconds=60, retries=2, name="common_task_standard")
def common_task_standard(a: int) -> int:
return common(a)
# Then use the appropriate variant in each flow
3. Your Original Approach (Option B)
If you have significantly different requirements for each flow and the task configurations vary widely, defining separate tasks in each flow (your Option B) is actually a valid approach. It's not as DRY, but it gives you the most flexibility.
The factory function approach (#1) is recommended because it:
- Maintains the single source of truth for the actual business logic
- Allows full customization of task configuration per flow
- Keeps the task creation code in one place
- Makes it easy to add new configuration options later
- Provides better type hinting and IDE support
- Makes it clear which configurations are being used in each flow
Important notes:
1. Task configurations (like retries, timeout) are set when the task is defined, not when it's called
2. The configuration cannot be changed during runtime after the task is defined
3. If you need truly dynamic runtime behavior, you might want to use the retry_condition_fn
parameter to implement custom retry logic based on runtime conditions
Choose the approach that best balances your needs for code reuse versus configuration flexibility. If the configurations are vastly different between flows and you prefer the clarity of seeing all configurations directly in each flow file, Option B (your original second approach) is still a valid choice.