Idriss Bellil
09/05/2025, 11:42 AMMarvin
09/05/2025, 11:42 AMMarvin
09/05/2025, 11:45 AM@flow
run; then all tasks it calls are fully orchestrated, tracked in the UI, and respect retries, caching, timeouts, etc. If you call tasks outside a flow, they won’t be tracked by Prefect.
- The helper is invisible in the UI: You’ll see the individual task runs, not the helper function. That means no group-level name, retries, cache, or timing in the UI (those still apply at the task level).
- Futures and concurrency: Calling a task inside a flow returns a PrefectFuture
. Use .result()
when you need the actual value. If you don’t call .result()
immediately, tasks can run concurrently (subject to your task runner).
- API: Prefect futures API
- Logging from helpers: You can use get_run_logger()
inside the helper; logs will attach to the parent flow run.
- API: get_run_logger
- Tags and limits: You can apply tags within the helper to influence concurrency limits or for organization.
- API: tags context manager
- When you might still want a subflow or wrapper:
- If you need a single “unit” with its own retries, timeout, parameter schema, or a separate run record, use a subflow.
- Alternative: wrap the group in a single parent task to get group-level retries/timeouts without a subflow (this adds one parent task node in the UI but avoids a subflow run).
Minimal example (sync flow)
from prefect import flow, task, get_run_logger, tags
@task(retries=2)
def step_a(x):
return x + 1
@task
def step_b(y):
return y * 2
@task
def combine(a, b):
return a + b
# Plain helper (no decorators) — won't show up in the UI
def my_group(x, y):
logger = get_run_logger()
with tags("my-group"): # optional tag
fa = step_a(x) # returns PrefectFuture
fb = step_b(y) # returns PrefectFuture
result_future = combine(fa, fb)
return result_future # return a future to allow the caller to control when to resolve
@flow
def main(x: int, y: int):
f = my_group(x, y)
# do other work in parallel if you want...
return f.result() # resolve when you actually need the value
if __name__ == "__main__":
print(main(1, 10))
If you want group-level retries/timeouts without a subflow, wrap the helper in a parent task:
@task(name="my_group", retries=3, timeout_seconds=600)
def my_group_task(x, y):
# Run the same helper and resolve to a concrete value so the parent task can retry as a unit
return my_group(x, y).result()
This adds one parent task node to the UI but avoids a subflow run.
Would you like an async version or an example that parallelizes many inputs with as_completed
?