Florent VanDeMoortele
09/23/2025, 9:32 AMMarvin
09/23/2025, 9:33 AMMarvin
09/23/2025, 9:40 AMfrom prefect import flow, task
@task
def clean(d: dict) -> dict:
d = {k: v.strip() if isinstance(v, str) else v for k, v in d.items()}
return d
@task
def process(d: dict) -> dict:
d["length"] = len(d)
return d
@flow
def pipeline(data: dict):
cleaned = clean(data)
return process(cleaned)
pipeline({"name": " Alice ", "city": " Paris "})
2) Structured: pass a dataclass
from dataclasses import dataclass
from typing import Optional
from prefect import flow, task
@dataclass
class Record:
id: int
name: str
city: Optional[str] = None
@task
def clean(r: Record) -> Record:
return Record(
id=r.id,
name=r.name.strip(),
city=r.city.strip() if r.city else None,
)
@task
def process(r: Record) -> dict:
return {"id": r.id, "length": len(r.name)}
@flow
def pipeline(record: Record):
return process(clean(record))
pipeline(Record(id=1, name=" Alice ", city=" Paris "))
3) Robust + persistence-friendly: Pydantic + JSON results
from pydantic import BaseModel, Field
from prefect import flow, task
from prefect.serializers import JSONSerializer
class Record(BaseModel):
id: int
name: str = Field(min_length=1)
city: str | None = None
@task(persist_result=True, result_serializer=JSONSerializer())
def clean(r: Record) -> Record:
return Record(
id=r.id,
name=r.name.strip(),
city=r.city.strip() if r.city else None,
)
@task(persist_result=True, result_serializer=JSONSerializer())
def process(r: Record) -> dict:
return {"id": r.id, "length": len(r.name)}
@flow
def pipeline(record: Record):
return process(clean(record))
pipeline(Record(id=1, name=" Alice ", city=" Paris "))
Tip: If you set default result storage (e.g., a GCS block) on your deployment, persisted JSON results go to that storage automatically.
4) Big data: store in GCS and pass URIs
```
from dataclasses import dataclass
from google.cloud import storage
from prefect import flow, task
@dataclass
class ArtifactRef:
bucket: str
path: str # e.g. "raw/myfile.json"
def _client():
return storage.Client()
@task
def load_from_gcs(ref: ArtifactRef) -> str:
client = _client()Marvin
09/23/2025, 9:40 AMFlorent VanDeMoortele
09/23/2025, 10:25 AMMarvin
09/23/2025, 10:25 AMMarvin
09/23/2025, 10:26 AMfrom dataclasses import dataclass
from prefect import flow, task
@dataclass
class Record:
id: int
payload: dict
@task
def clean(r: Record) -> Record:
r.payload = {k: v for k, v in r.payload.items() if v is not None}
return r
@task
def process(r: Record) -> dict:
return {"id": r.id, "num_keys": len(r.payload)}
@flow
def pipeline(r: Record):
return process(clean(r))
# pipeline(Record(id=1, payload={"a": 1, "b": None}))
- Pydantic + JSON persistence (good for Prefect Cloud + GCS):
from pydantic import BaseModel
from prefect import flow, task
from prefect.serializers import JSONSerializer
from prefect_gcp.cloud_storage import GcsBucket
# Create and configure this block in the UI once, then load here
gcs = GcsBucket.load("my-gcs-results")
class Record(BaseModel):
id: int
payload: dict
@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def clean(r: Record) -> Record:
return Record(id=r.id, payload={k: v for k, v in r.payload.items() if v is not None})
@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def process(r: Record) -> dict:
return {"id": r.id, "num_keys": len(r.payload)}
@flow
def pipeline(r: Record):
return process(clean(r))
- If you later need to offload to GCS and pass only references:
from dataclasses import dataclass
from google.cloud import storage
from prefect import flow, task
@dataclass
class ArtifactRef:
bucket: str
path: str
@task
def write_to_gcs(bucket: str, path: str, data: bytes) -> ArtifactRef:
client = storage.Client()
client.bucket(bucket).blob(path).upload_from_string(data)
return ArtifactRef(bucket, path)
@task
def read_from_gcs(ref: ArtifactRef) -> bytes:
client = storage.Client()
return client.bucket(ref.bucket).blob(ref.path).download_as_bytes()
# Pass ArtifactRef between tasks; size stays tiny even if payload is large
Notes and gotchas
- Don’t pass open clients/sockets between tasks; re-create per task.
- For distributed runners (e.g., Dask), 10 MB is fine, but each task edge will serialize/transfer it—watch for fan-out/fan-in patterns.
- If you persist results, JSON via Pydantic is safer than pickled custom classes across code changes.
Docs and references
- Results and persistence in Prefect 2: Results (Prefect docs)
- GCS block and usage: prefect-gcp: Cloud Storage
If you tell me whether you’re running tasks locally vs. on Dask/Ray/Kubernetes and whether you want persisted results, I can tailor this to your exact setup.Florent VanDeMoortele
09/23/2025, 2:12 PMMarvin
09/23/2025, 2:12 PMMarvin
09/23/2025, 2:14 PM.result()
(singular) on a PrefectFuture
to get the Python object back. If your task returned a dataclass, you’ll get that dataclass instance (assuming it was serializable).
- For nested containers of futures (dicts/lists of futures), use prefect.futures.resolve_futures_to_results(...)
to resolve everything in one shot.
- If you persist results and use pickle, your dataclass is reconstructed automatically (as long as the class is top-level importable and code versions match). If you use JSON, you’ll get a dict back unless you explicitly reconstruct the dataclass.
Quick examples
1) Basic: dataclass + future.result()
from dataclasses import dataclass
from prefect import task, flow
@dataclass
class Record:
id: int
payload: dict
@task
def make_record() -> Record:
return Record(id=1, payload={"a": 1})
@flow
def run():
fut = make_record.submit()
rec = fut.result() # Record(id=1, payload={'a': 1})
return rec
2) Nested futures: resolve in one call
from prefect.futures import resolve_futures_to_results
@flow
def run_many():
futures = {
"first": make_record.submit(),
"batch": [make_record.submit() for _ in range(3)],
}
resolved = resolve_futures_to_results(futures)
# resolved["first"] is a Record
# resolved["batch"] is a list[Record]
return resolved
3) Persisting results with pickle (reconstructs dataclass automatically)
from prefect import task
from prefect_gcp.cloud_storage import GcsBucket
from prefect.serializers import PickleSerializer
gcs = GcsBucket.load("my-gcs-results")
@task(persist_result=True, result_storage=gcs, result_serializer=PickleSerializer())
def compute() -> Record:
return Record(id=42, payload={"k": "v"})
@flow
def run():
fut = compute.submit()
rec = fut.result() # Full Record object
4) Persisting with JSON (you must reconstruct the dataclass)
- Option A: return dicts and reconstruct later
from dataclasses import asdict
from prefect.serializers import JSONSerializer
@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def compute_dict() -> dict:
rec = Record(id=42, payload={"k": "v"})
return asdict(rec)
@task
def reconstruct(d: dict) -> Record:
return Record(**d)
@flow
def run():
fut = compute_dict.submit()
d = fut.result() # dict from JSON
rec = reconstruct(d) # back to Record
- Option B (recommended): use Pydantic models instead of dataclasses if you want JSON persistence with automatic reconstruction.
Common pitfalls and tips
- Always define your dataclass at module top-level (not inside a function) so it’s picklable.
- If you see .results()
in examples, that’s not a Prefect 2 method—use .result()
. For containers of futures, use resolve_futures_to_results(...)
.
- For distributed runs or after retries where results are loaded from storage, make sure the class definition is available (same import path) on the worker.
- If your dataclass contains non-serializable fields (e.g., a client), exclude them from the dataclass or mark them with defaults and recreate inside the task.
Handy references
- Results and persistence: Results (Prefect docs)
- Futures utilities: prefect.futures API
- GCS result storage: prefect-gcp: Cloud Storage
If you share how you’re persisting results (pickle vs JSON) and whether you have nested futures, I can tailor the exact pattern (and help choose between dataclass vs Pydantic).Florent VanDeMoortele
09/23/2025, 2:36 PMRuntimeWarning: coroutine 'resolve_futures_to_data' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Marvin
09/23/2025, 2:36 PMMarvin
09/23/2025, 2:37 PMresolve_futures_to_data
(Prefect 3.x API) in a Prefect 2 project. That function is async, so calling it without await
yields “coroutine was never awaited”.
- In Prefect 2, the equivalent is resolve_futures_to_results
and it is synchronous.
Fix
- Use the Prefect 2 helper and don’t await it:
from prefect.futures import resolve_futures_to_results
resolved = resolve_futures_to_results(my_container_of_futures)
- For a single future, use .result()
(singular), not `.results()`:
rec = future.result()
Minimal example that preserves your complex dataclass
from dataclasses import dataclass
from prefect import task, flow
from prefect.futures import resolve_futures_to_results
@dataclass
class Record:
id: int
payload: dict
@task
def build(i: int) -> Record:
return Record(i, {"k": i})
@flow
def main():
futs = {"one": build.submit(1), "many": [build.submit(i) for i in range(2, 5)]}
data = resolve_futures_to_results(futs) # no await needed in Prefect 2
# data["one"] is a Record; data["many"] is list[Record]
return data
If you really are on Prefect 3.x, then do:
from prefect.futures import resolve_futures_to_data
resolved = await resolve_futures_to_data(my_container_of_futures)
But since you said you’re on Prefect 2 (Cloud), the Prefect 2 version above is the right one. If the warning persists, please share the import and the line where you call the resolver so I can pinpoint it. Links: prefect.futures APIFlorent VanDeMoortele
09/23/2025, 2:42 PMMarvin
09/23/2025, 2:42 PMMarvin
09/23/2025, 2:43 PMresolve_futures_to_data
can exist in some Prefect 2 installs—but it’s an async coroutine. The “coroutine was never awaited” warning means it was called from a synchronous context without await
.
You have two safe options in Prefect 2:
Option A (simplest): use the synchronous helper
from prefect.futures import resolve_futures_to_results
resolved = resolve_futures_to_results(my_container_of_futures)
- Works in sync flows/tasks.
- Returns your original Python objects (e.g., dataclass instances) just like calling .result()
on each future.
Option B: make the call from an async context and await it
- Async flow:
from prefect import flow
from prefect.futures import resolve_futures_to_data
@flow
async def main():
futures = {...}
resolved = await resolve_futures_to_data(futures)
return resolved
- Async task:
from prefect import task
from prefect.futures import resolve_futures_to_data
@task
async def gather_all(container):
return await resolve_futures_to_data(container)
- If you’re outside of Prefect (plain script), you can do:
import asyncio
resolved = asyncio.run(resolve_futures_to_data(my_container_of_futures))
Notes
- For a single future, use .result()
(singular), not .results()
.
- If you persist results with JSON, dataclasses will round-trip as dicts; reconstruct with Record(**d)
or switch to Pydantic for automatic JSON (de)serialization.
- If you’re in a sync @flow
and don’t want to convert to async, stick with resolve_futures_to_results
.
Sanity check (optional)
import prefect, inspect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "resolve_futures" in n])
Docs
- Futures API: prefect.futures API
If you share the exact snippet where you call the resolver (and whether it’s inside a flow/task), I can suggest the minimal change to remove the warning.Florent VanDeMoortele
09/23/2025, 2:45 PMMarvin
09/23/2025, 2:45 PMMarvin
09/23/2025, 2:46 PMresolve_futures_to_results
. Here are reliable patterns that work today:
What to use instead
- Single future: use .result()
(singular)
val = fut.result()
- Resolve a container of futures (sync-safe, works in any 2.20 flow/task)
from collections.abc import Mapping, Sequence
from prefect.futures import PrefectFuture
def resolve_all(obj):
if isinstance(obj, PrefectFuture):
return obj.result()
if isinstance(obj, Mapping):
return type(obj)({k: resolve_all(v) for k, v in obj.items()})
if isinstance(obj, (list, tuple, set)):
seq = [resolve_all(v) for v in obj]
return tuple(seq) if isinstance(obj, tuple) else type(obj)(seq)
return obj
# Usage:
resolved = resolve_all(my_container_of_futures)
- If you already have a flat list of futures and just want to wait, then collect:
from prefect.futures import wait
wait(list_of_futures) # ensure completion
results = [f.result() for f in list_of_futures]
- Stream results as they complete:
from prefect.futures import as_completed
for f in as_completed(list_of_futures):
item = f.result()
...
About resolve_futures_to_data
- In some Prefect 2 builds, resolve_futures_to_data
exists but is async. If you use it, you must await it from an async flow/task:
from prefect import flow
from prefect.futures import resolve_futures_to_data
@flow
async def main():
futures = {...}
data = await resolve_futures_to_data(futures)
return data
- In a synchronous flow, don’t call it directly (that’s what triggered “coroutine was never awaited”). Use the resolve_all
helper above instead.
Complex dataclasses
- If your tasks return dataclass instances, .result()
gives you the dataclass back (assuming you didn’t persist with JSON). If you persist with JSON, you’ll get a dict and should reconstruct the dataclass, e.g., Record(**d)
, or use Pydantic models for automatic JSON (de)serialization.
Sanity check what’s available in your env
import prefect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "resolve_futures" in n])
References
- Futures utilities (API): prefect.futures API
If you paste the snippet where you’re resolving your futures (and whether your flow is sync or async), I can give you the exact minimal change.Florent VanDeMoortele
09/23/2025, 2:54 PMMarvin
09/23/2025, 2:54 PMMarvin
09/23/2025, 2:55 PM.submit()
over .map
in Prefect 2.
Why
- Prefect 2’s recommended fan-out pattern is explicit submission:
- You get clear control over batching, backpressure, and error handling.
- Works the same in sync or async flows and across task runners (threads, Dask, Ray).
- The “mapped tasks” pattern from Prefect 1.x isn’t the primary API in 2.x. Even if you see a .map
on some objects, it’s not the common/idiomatic approach in 2.x and can be confusing.
Patterns to use
- Fan-out/fan-in
from prefect import flow, task
@task
def process(x: int) -> int:
return x * 2
@flow
def main(items: list[int]):
futs = [process.submit(x) for x in items]
results = [f.result() for f in futs]
return results
- Stream results as they complete
from prefect.futures import as_completed
@flow
def main(items: list[int]):
futs = [process.submit(x) for x in items]
for f in as_completed(futs):
r = f.result()
# handle r (e.g., write to GCS) before the rest finish
- Batch to avoid thousands of tiny tasks
def chunks(seq, n):
for i in range(0, len(seq), n):
yield seq[i:i+n]
@flow
def main(items: list[int], batch_size: int = 100):
batch_futs = [process_batch.submit(batch) for batch in chunks(items, batch_size)]
# gather or stream results from batch_futs
Tips
- Use a task runner to get concurrency:
- Threads (default ConcurrentTaskRunner) work well for I/O-bound tasks.
- For more scale, use Dask/Ray task runners. See Task runners.
- If you need to resolve nested containers of futures, either:
- Call .result()
on each future (as above), or
- If you have resolve_futures_to_data
in 2.20 and are in an async flow, await
it; otherwise write a tiny resolver that recursively calls .result()
.
If you share your task runner (threads vs Dask/Ray) and the typical number of items you fan out, I can suggest a batching/concurrency strategy tuned to your workload.Florent VanDeMoortele
09/23/2025, 3:00 PMMarvin
09/23/2025, 3:00 PMMarvin
09/23/2025, 3:01 PMas_completed
isn’t available. Here are solid alternatives that work in 2.20 today.
Option 1: Simple fan-out/fan-in (sync)
futs = [task_fn.submit(x) for x in items]
# optionally ensure they’ve all finished
from prefect.futures import wait
wait(futs)
results = [f.result() for f in futs]
Option 2: Stream results as they finish (polyfill using wait)
from time import sleep
from prefect.futures import wait, PrefectFuture
def iter_as_completed(futures, poll=0.5):
pending = set(futures)
while pending:
done_and_not = wait(list(pending), timeout=poll)
for f in done_and_not.done:
yield f
pending = done_and_not.not_done
if pending and poll:
sleep(poll)
# Usage
futs = [task_fn.submit(x) for x in items]
for f in iter_as_completed(futs):
result = f.result()
# handle result incrementally
Option 3: Resolve nested containers of futures (sync)
from collections.abc import Mapping, Sequence
from prefect.futures import PrefectFuture
def resolve_all(obj):
if isinstance(obj, PrefectFuture):
return obj.result()
if isinstance(obj, Mapping):
return type(obj)({k: resolve_all(v) for k, v in obj.items()})
if isinstance(obj, (list, tuple, set)):
seq = [resolve_all(v) for v in obj]
return tuple(seq) if isinstance(obj, tuple) else type(obj)(seq)
return obj
# resolved = resolve_all({"a": fut_a, "b": [fut_b1, fut_b2]})
Async flows (if you have resolve_futures_to_data)
- If resolve_futures_to_data
is present in your 2.20 install, it’s async — you must await
it from an async `@flow`/`@task`:
from prefect import flow
from prefect.futures import resolve_futures_to_data
@flow
async def main():
futures = {...}
data = await resolve_futures_to_data(futures)
return data
Sanity check in your env
import prefect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "as_completed" in n or "resolve_futures" in n])
Docs: prefect.futures API
If you share a short snippet of how you’re submitting and collecting, I can tailor the minimal changes for your exact pattern.