<@ULVA73B9P> I want to group tasks together so I c...
# ask-marvin
i
@Marvin I want to group tasks together so I can re-use them, I don't like subflows as they show up separately in the UI, can I simply call a group of tasks in a simple Python function or are there other things I should be aware of?
m
thought for 171.4 seconds
Short answer: yes. In Prefect 3.x you can group tasks inside a plain Python helper function and call that helper from your flow. The helper itself won’t appear in the UI; only the tasks it calls will. This is a common pattern for reuse without introducing subflows. What to keep in mind - Call it from inside a flow: Make sure your helper runs within an active
@flow
run; then all tasks it calls are fully orchestrated, tracked in the UI, and respect retries, caching, timeouts, etc. If you call tasks outside a flow, they won’t be tracked by Prefect. - The helper is invisible in the UI: You’ll see the individual task runs, not the helper function. That means no group-level name, retries, cache, or timing in the UI (those still apply at the task level). - Futures and concurrency: Calling a task inside a flow returns a
PrefectFuture
. Use
.result()
when you need the actual value. If you don’t call
.result()
immediately, tasks can run concurrently (subject to your task runner). - API: Prefect futures API - Logging from helpers: You can use
get_run_logger()
inside the helper; logs will attach to the parent flow run. - API: get_run_logger - Tags and limits: You can apply tags within the helper to influence concurrency limits or for organization. - API: tags context manager - When you might still want a subflow or wrapper: - If you need a single “unit” with its own retries, timeout, parameter schema, or a separate run record, use a subflow. - Alternative: wrap the group in a single parent task to get group-level retries/timeouts without a subflow (this adds one parent task node in the UI but avoids a subflow run). Minimal example (sync flow)
Copy code
from prefect import flow, task, get_run_logger, tags

@task(retries=2)
def step_a(x):
    return x + 1

@task
def step_b(y):
    return y * 2

@task
def combine(a, b):
    return a + b

# Plain helper (no decorators) — won't show up in the UI
def my_group(x, y):
    logger = get_run_logger()
    with tags("my-group"):  # optional tag
        fa = step_a(x)      # returns PrefectFuture
        fb = step_b(y)      # returns PrefectFuture
        result_future = combine(fa, fb)
    return result_future    # return a future to allow the caller to control when to resolve

@flow
def main(x: int, y: int):
    f = my_group(x, y)
    # do other work in parallel if you want...
    return f.result()  # resolve when you actually need the value

if __name__ == "__main__":
    print(main(1, 10))
If you want group-level retries/timeouts without a subflow, wrap the helper in a parent task:
Copy code
@task(name="my_group", retries=3, timeout_seconds=600)
def my_group_task(x, y):
    # Run the same helper and resolve to a concrete value so the parent task can retry as a unit
    return my_group(x, y).result()
This adds one parent task node to the UI but avoids a subflow run. Would you like an async version or an example that parallelizes many inputs with
as_completed
?