<@ULVA73B9P> what is the best way to mock and test...
# ask-marvin
j
@Marvin what is the best way to mock and test Prefect tasks?
m
thought for 131.7 seconds
Short answer: test the function, not the orchestration. In Prefect 3.x, the most reliable pattern is: - Keep business logic in plain functions. - Wrap them with `@task`/`@flow` as thin adapters. - Unit test the logic (or the task/flow via
.fn
) with normal Python tooling. - Only use real Prefect orchestration in a few integration tests when you truly need to verify retries,
.submit
, logging context, etc. Recipes 1) Fast unit tests: call tasks/flows with .fn to bypass orchestration
Copy code
from prefect import task, flow

def _add(x, y):
    return x + y

@task
def add(x, y):
    return _add(x, y)

def test_add_unit():
    assert add.fn(1, 2) == 3

@flow
def compute():
    return add(1, 2)

def test_flow_unit():
    # .fn bypasses orchestration and concurrency
    assert compute.fn() == 3
2) Mock external dependencies as usual
Copy code
# module.py
from prefect import task
import httpx

@task
def fetch_json(url):
    return httpx.get(url, timeout=5).json()

# test_module.py
def test_fetch_json(mocker):
    mock_resp = mocker.Mock()
    mock_resp.json.return_value = {"ok": True}
    mocker.patch("module.httpx.get", return_value=mock_resp)

    assert fetch_json.fn("<https://example.com>") == {"ok": True}
3) Tasks that use get_run_logger
get_run_logger()
requires a run context. Two options: - Unit tests: patch it to return a standard logger. - Integration tests: run under a real flow (not
.fn
) so context exists. Patch for unit tests:
Copy code
import logging
from unittest.mock import patch

@task
def my_task():
    logger = prefect.get_run_logger()
    <http://logger.info|logger.info>("hi")
    return 42

def test_my_task_unit():
    with patch("prefect.get_run_logger", return_value=logging.getLogger("test")):
        assert my_task.fn() == 42
4) Testing submit/mapping/concurrency If you need to exercise
.submit
, run inside a flow normally (no
.fn
), then collect results from futures.
Copy code
from prefect import task, flow

@task
def add(x, y):
    return x + y

@flow
def run_parallel():
    futs = [add.submit(i, 1) for i in range(3)]
    return [f.result() for f in futs]

def test_submit_integration():
    assert run_parallel() == [1, 2, 3]
Note: This invokes Prefect’s runtime; it’s slower but validates futures, logging context, and concurrency. 5) Testing retries/timeouts (integration-style)
Copy code
from prefect import task, flow

@task(retries=2, retry_delay_seconds=0)
def flaky(counter: list[int]):
    counter[0] += 1
    if counter[0] < 3:
        raise RuntimeError("not yet")
    return "ok"

@flow
def run_flaky():
    # mutable counter to observe retry attempts
    counter = [0]
    return flaky(counter)

def test_retries():
    assert run_flaky() == "ok"   # will retry twice before succeeding
Guidelines and caveats - Prefer testing the pure function behind a task. The decorator shouldn’t hide complexity. -
.fn
bypasses Prefect orchestration: no retries, no timeouts, no run context, no futures. Perfect for unit tests; useless for testing orchestration features. - If your task/flow depends on Prefect runtime (e.g.,
get_run_logger
,
prefect.runtime.*
, task submission), write a focused integration test that runs a real flow call. - Don’t mock Prefect internals. Mock your code’s dependencies instead. - You don’t need a worker or work pool for unit/integration tests—you can run flows locally. Keep Vertex AI tests for end-to-end deployment coverage, not unit tests. If you want, I can tailor examples to your exact patterns (e.g., tasks that call other tasks via
.submit
, tasks using blocks/secrets, or pytest fixtures you already use).