José Agretti
08/11/2025, 7:34 PMMarvin
08/11/2025, 7:34 PMMarvin
08/11/2025, 7:37 PM.fn
) with normal Python tooling.
- Only use real Prefect orchestration in a few integration tests when you truly need to verify retries, .submit
, logging context, etc.
Recipes
1) Fast unit tests: call tasks/flows with .fn to bypass orchestration
from prefect import task, flow
def _add(x, y):
return x + y
@task
def add(x, y):
return _add(x, y)
def test_add_unit():
assert add.fn(1, 2) == 3
@flow
def compute():
return add(1, 2)
def test_flow_unit():
# .fn bypasses orchestration and concurrency
assert compute.fn() == 3
2) Mock external dependencies as usual
# module.py
from prefect import task
import httpx
@task
def fetch_json(url):
return httpx.get(url, timeout=5).json()
# test_module.py
def test_fetch_json(mocker):
mock_resp = mocker.Mock()
mock_resp.json.return_value = {"ok": True}
mocker.patch("module.httpx.get", return_value=mock_resp)
assert fetch_json.fn("<https://example.com>") == {"ok": True}
3) Tasks that use get_run_logger
get_run_logger()
requires a run context. Two options:
- Unit tests: patch it to return a standard logger.
- Integration tests: run under a real flow (not .fn
) so context exists.
Patch for unit tests:
import logging
from unittest.mock import patch
@task
def my_task():
logger = prefect.get_run_logger()
<http://logger.info|logger.info>("hi")
return 42
def test_my_task_unit():
with patch("prefect.get_run_logger", return_value=logging.getLogger("test")):
assert my_task.fn() == 42
4) Testing submit/mapping/concurrency
If you need to exercise .submit
, run inside a flow normally (no .fn
), then collect results from futures.
from prefect import task, flow
@task
def add(x, y):
return x + y
@flow
def run_parallel():
futs = [add.submit(i, 1) for i in range(3)]
return [f.result() for f in futs]
def test_submit_integration():
assert run_parallel() == [1, 2, 3]
Note: This invokes Prefect’s runtime; it’s slower but validates futures, logging context, and concurrency.
5) Testing retries/timeouts (integration-style)
from prefect import task, flow
@task(retries=2, retry_delay_seconds=0)
def flaky(counter: list[int]):
counter[0] += 1
if counter[0] < 3:
raise RuntimeError("not yet")
return "ok"
@flow
def run_flaky():
# mutable counter to observe retry attempts
counter = [0]
return flaky(counter)
def test_retries():
assert run_flaky() == "ok" # will retry twice before succeeding
Guidelines and caveats
- Prefer testing the pure function behind a task. The decorator shouldn’t hide complexity.
- .fn
bypasses Prefect orchestration: no retries, no timeouts, no run context, no futures. Perfect for unit tests; useless for testing orchestration features.
- If your task/flow depends on Prefect runtime (e.g., get_run_logger
, prefect.runtime.*
, task submission), write a focused integration test that runs a real flow call.
- Don’t mock Prefect internals. Mock your code’s dependencies instead.
- You don’t need a worker or work pool for unit/integration tests—you can run flows locally. Keep Vertex AI tests for end-to-end deployment coverage, not unit tests.
If you want, I can tailor examples to your exact patterns (e.g., tasks that call other tasks via .submit
, tasks using blocks/secrets, or pytest fixtures you already use).