<@ULVA73B9P> Is there a way to call a task from wi...
# ask-marvin
t
@Marvin Is there a way to call a task from within a task? I thought that was not allowed previously, but I am seeing one of my prefect scripts is "successfully" calling a task from within a task.
m
thought for 572.9 seconds
Short answer: yes, it can happen — but it’s usually not what you want. What you’re seeing is expected in Prefect 3.x (and 2.x): if a task is called from within another task while a flow run is active, Prefect will treat that inner call as another task run in the same flow. This used to be disallowed in Prefect 1.x (static DAG era), which is likely what you’re remembering. A few important notes and caveats: - It “works,” but is discouraged: orchestrate tasks from the flow level whenever possible. Nesting tasks inside tasks makes retries, caching, and concurrency harder to reason about. - Concurrency: the inner task is submitted to the flow’s task runner. Whether it runs concurrently depends on your runner and available capacity. - Retries/caching: the inner task has its own retry/caching policies, but if the outer task retries, it may re‑submit the inner task again — sometimes surprising. - Debugging/logging: child task logs/states are separate; failures bubble up if you wait on the child inside the parent. Recommended patterns instead: - Compose at the flow level (preferred) - Use subflows when you want a reusable, independently managed unit - Use plain Python functions for code reuse and call them from tasks Examples - What you’re likely doing now (works, but not recommended):
Copy code
from prefect import task, flow

@task
def child(x):
    return x + 1

@task
def parent(x):
    # This schedules another task run from inside a task
    fut = child.submit(x)
    return fut.result()

@flow
def my_flow(x):
    return parent(x)
- Better: orchestrate from the flow
Copy code
from prefect import task, flow

@task
def child(x):
    return x + 1

@task
def parent_logic(x):
    # pure logic or lightweight work only
    return x * 2

@flow
def my_flow(x):
    a = parent_logic.submit(x)
    b = child.submit(a)
    return b.result()
- Or factor shared logic into a plain function
Copy code
def compute(x):
    return x + 1

@task
def child(x):
    return compute(x)

@task
def parent(x):
    # Reuse the same logic without creating nested task runs
    return compute(x)
- Use subflows for larger units
Copy code
from prefect import flow, task

@flow
def child_flow(x):
    return x + 1

@flow
def parent_flow(x):
    return child_flow(x)  # a separate subflow run
If you can share which Prefect version you’re on, I can tailor the guidance to that version. If you want, drop a snippet of the “nested task” you’re seeing and I’ll suggest the cleanest refactor. References - Workflow design patterns with Prefect - Prefect 3 GA announcement (covers execution model and design goals) - Task runners and concurrency concepts: Task runners in Prefect 3 and Running work concurrently