<@ULVA73B9P> what's the best practices to organize...
# ask-marvin
f
@Marvin what's the best practices to organize code between flow, task and function?
m
thought for 202.3 seconds
Short version: - Plain functions for logic - Tasks for work you want Prefect to manage (retries, caching, concurrency, visibility) - Flows for orchestration and deployment Longer, but still mercifully brief: What belongs in a plain function - Pure, reusable logic: transforms, parsing, small helpers - Anything that doesn’t need retries, caching, logging, or orchestration - Keeps tests fast and code importable without Prefect context What belongs in a @task - External I/O and flaky calls: APIs, DB queries, S3/GCS, file I/O - Expensive steps you may want to retry, cache, or time out - Concurrency/rate-limiting boundaries - Steps whose inputs/outputs you want tracked or persisted - Note: Call tasks from flows, not from other tasks What belongs in a @flow (and subflows) - Orchestration: parameter surface, branching, calling tasks - Choosing task runners/concurrency model - Run-level retries/timeouts (sparingly; prefer task-level retries) - Subflows when a group of steps should be runnable/observable independently, have their own retries/timeouts, or deserve their own deployment Practical guidelines - Don’t over-taskify: small, noisy tasks add orchestration overhead and clutter logs - Keep side effects in tasks; keep business logic in functions - Prefer task-level retries/timeouts; use flow-level only to cap total runtime - Avoid passing huge objects between tasks (serialization cost). Write to storage and pass references/paths - Use caching deliberately (stable task names + cache keys) for expensive deterministic steps - Logging: use
get_run_logger()
inside flows/tasks; keep plain functions quiet - Testing: test plain functions directly; call tasks/flows like normal functions in unit tests. Only use Prefect test utilities when you need state/engine behavior - Structure: package your code - `src/your_project/flows/`: entrypoint flows (one per file is civilized) - `src/your_project/tasks/`: reusable tasks -
src/your_project/utils/
or `services/`: plain functions, clients, models - Flows import tasks; tasks import utils; avoid circular imports - Deployments (Prefect 3.x): create deployments from flow entrypoints; don’t rely on deprecated 2.x builders Minimal example
Copy code
from prefect import flow, task, get_run_logger

# plain functions: pure logic
def clean_record(r):
    r["value"] = r["value"].strip().lower()
    return r

# tasks: I/O and orchestration-worthy steps
@task(retries=3, retry_delay_seconds=5, timeout_seconds=30)
def fetch_records(source_url: str) -> list[dict]:
    import requests
    r = requests.get(source_url, timeout=10)
    r.raise_for_status()
    return r.json()

@task(persist_result=True)
def write_to_warehouse(rows: list[dict]) -> int:
    # pretend to batch-insert, return count
    return len(rows)

# flow: orchestration
@flow(name="etl-pipeline")
def etl(source_url: str) -> int:
    logger = get_run_logger()
    raw = fetch_records(source_url)
    cleaned = [clean_record(r) for r in raw]  # pure function stays fast and testable
    count = write_to_warehouse(cleaned)
    <http://logger.info|logger.info>(f"Loaded {count} rows")
    return count
When to use subflows - You want to run a branch by itself sometimes - You need separate retries/timeouts/notifications for a chunk of the pipeline - You want separate deployments/schedules per logical stage (e.g., ingest, transform, publish) Common pitfalls to avoid - Calling tasks from tasks (don’t; compose in flows) - Turning every line into a task - Passing gigantic data payloads between tasks - Using 2.x-only APIs in 3.x (e.g.,
Deployment.build_from_flow
or
prefect deployment build
are gone) Docs you may actually want to open - Flows: Flows - Tasks: Tasks - Results and caching: Results and caching
If you share a quick sketch of your pipeline (steps, size of data, where the flakiness lives), I can propose a concrete module layout. I know, thrilling.
f
@Marvin in your example, you call a plain function in the flow instead of calling this plain function in a task and call this task in flow. Is this a good practice?
m
thought for 71.7 seconds
Yes — that’s intentional and good practice. Use plain functions inside flows when: - The logic is fast, deterministic, and doesn’t need retries, caching, result persistence, or separate logging - You don’t need parallelism or a concurrency boundary - You want simple, fast tests and less orchestration noise Wrap it as a task when you need Prefect to manage it: - Retries/timeouts, caching, result persistence, visibility in the UI - Concurrency/rate-limiting or parallel execution - A clean failure boundary so the whole flow doesn’t retry Rule of thumb: - Keep business logic as plain functions - Put I/O, flaky, slow, or parallelizable steps in tasks - Orchestrate in flows Same example, two ways: Plain function (simple and fine):
Copy code
@flow
def etl(url: str):
    raw = fetch_records(url)        # task: I/O, retries
    cleaned = [clean_record(r) for r in raw]  # plain function: cheap transform
    return write_to_warehouse(cleaned)  # task: I/O, persistence
Make it a task if you want parallelism/caching/retries:
Copy code
from prefect import task
from prefect.futures import resolve_futures_to_results

@task(retries=2)
def clean_record_task(r): 
    return clean_record(r)

@flow
def etl(url: str):
    raw = fetch_records(url)
    futs = [clean_record_task(r) for r in raw]  # concurrent under the task runner
    cleaned = resolve_futures_to_results(futs)
    return write_to_warehouse(cleaned)
Docs for reference: - Flows: Flows - Tasks: Tasks - Results/caching: Results and caching If you share where you need retries/parallelism, I’ll point out exactly which bits should become tasks. Yes, I do live for this.
f
@Marvin I have some dictionaries to process. I want to process in parallel. The first step can crash. But if one of the dictionaries crash, I want to do the second step for others dictionaries. How can I do that?
m
thought for 140.3 seconds
Short version: submit step 1 for each dict, let some fail, then run step 2 only for the ones that succeeded. Prefect 3 makes this pretty straightforward. Example pattern:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed, wait

@task
def first_step(d: dict) -> dict:
    # Do something that might raise
    if d.get("should_fail"):
        raise ValueError("Unlucky dictionary.")
    return {"id": d.get("id"), "processed": True}

@task
def second_step(original: dict, result: dict) -> None:
    # Do the follow-up work
    pass

@flow(task_runner=ThreadPoolTaskRunner())
def process_all(dicts: list[dict]):
    logger = get_run_logger()

    # Fan out step 1 in parallel
    futures = {first_step.submit(d): d for d in dicts}

    # For each finished step 1, run step 2 only if it succeeded
    step2_futures = []
    for fut in as_completed(list(futures)):
        d = futures[fut]
        try:
            res = fut.result()  # raises if first_step failed/crashed
        except Exception as exc:
            logger.warning(f"first_step failed for {d}: {exc}")
            continue
        step2_futures.append(second_step.submit(d, res))

    # Optionally wait for step 2 completions
    wait(step2_futures)
Notes: -
ThreadPoolTaskRunner
runs tasks concurrently within the flow. Use it if you want parallelism in a single flow run. -
as_completed
lets you react as each task finishes; failures are caught and don’t block others. - If you’re not using Prefect and just want raw Python, I can show a
concurrent.futures
version too. But since you asked, yes—you can let the broken ones crash in peace while the rest carry on.
f
@Marvin I work with Prefect 2
m
thought for 71.6 seconds
Got it — Prefect 2 it is. You can still fan out, let some inputs fail, and only run the follow-up for the survivors. Pattern: - Submit step 1 for each dict (in parallel). - For each finished future, try to get its result; on failure, log and skip. - Submit step 2 only for the successful ones. Example:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
# If you prefer processes/cluster parallelism, use DaskTaskRunner instead.

@task
def first_step(d: dict) -> dict:
    if d.get("should_fail"):
        raise ValueError("Unlucky dictionary.")
    return {"id": d.get("id"), "processed": True}

@task
def second_step(original: dict, result: dict) -> None:
    # Do the follow-up work
    pass

@flow(task_runner=ConcurrentTaskRunner())
def process_all(dicts: list[dict]):
    logger = get_run_logger()

    # Fan out step 1 in parallel
    futures = {first_step.submit(d): d for d in dicts}

    # Option A: react as tasks complete (if your Prefect 2 version has as_completed)
    try:
        from prefect.futures import as_completed
        step2 = []
        for fut in as_completed(list(futures)):
            d = futures[fut]
            try:
                res = fut.result()  # raises if first_step failed/crashed
            except Exception as exc:
                logger.warning(f"first_step failed for {d}: {exc}")
                continue
            step2.append(second_step.submit(d, res))
        for f in step2:
            f.result()
    except Exception:
        # Option B: simple fallback — iterate and block in order
        step2 = []
        for fut, d in futures.items():
            try:
                res = fut.result()
            except Exception as exc:
                logger.warning(f"first_step failed for {d}: {exc}")
                continue
            step2.append(second_step.submit(d, res))
        for f in step2:
            f.result()
Notes: - Use
ConcurrentTaskRunner
for thread-based parallelism in a single flow run. If your work is CPU-bound or you want distributed execution, consider
DaskTaskRunner
. - If you prefer to wire the DAG and still continue despite failures, you can wrap first-step futures with
allow_failure
and make
second_step
handle error objects — but skipping failed ones (as above) is simpler. Do you want threads (ConcurrentTaskRunner) or processes/cluster (DaskTaskRunner)? And roughly how many dictionaries are we talking about? I’ll tune the example accordingly.