Idriss Bellil
09/05/2025, 11:42 AMMarvin
09/05/2025, 11:42 AMMarvin
09/05/2025, 11:45 AM@flowPrefectFuture.result().result()get_run_logger()from prefect import flow, task, get_run_logger, tags
@task(retries=2)
def step_a(x):
    return x + 1
@task
def step_b(y):
    return y * 2
@task
def combine(a, b):
    return a + b
# Plain helper (no decorators) — won't show up in the UI
def my_group(x, y):
    logger = get_run_logger()
    with tags("my-group"):  # optional tag
        fa = step_a(x)      # returns PrefectFuture
        fb = step_b(y)      # returns PrefectFuture
        result_future = combine(fa, fb)
    return result_future    # return a future to allow the caller to control when to resolve
@flow
def main(x: int, y: int):
    f = my_group(x, y)
    # do other work in parallel if you want...
    return f.result()  # resolve when you actually need the value
if __name__ == "__main__":
    print(main(1, 10))@task(name="my_group", retries=3, timeout_seconds=600)
def my_group_task(x, y):
    # Run the same helper and resolve to a concrete value so the parent task can retry as a unit
    return my_group(x, y).result()as_completed