<@ULVA73B9P> I work with Prefect 2. I have a data ...
# ask-marvin
f
@Marvin I work with Prefect 2. I have a data dictionary, that I want to pass through tasks to clean, process and get results from these raw data. What's the best object to pass through tasks : a Python dict? A Python dataclass? another format?
m
thought for 443.3 seconds
Great question. In Prefect 2 the “best” object depends on how your tasks run and how big your payloads are. Here’s a practical guide. General guidance - Small to medium payloads (KBs to a few MBs): passing Python objects directly is fine. Prefect uses cloudpickle under the hood to move results across task boundaries. Keep objects simple and top-level (no lambdas/inner classes). - Large payloads (dataframes, large arrays, images, >10s of MBs): don’t pass them between tasks. Store them in object storage (GCS in your case) and pass lightweight references (URIs, metadata) between tasks. - Validation and stability: prefer a structured, typed model so you catch mistakes early and can serialize to JSON when persisting results. What to pass between tasks - Python dict - Pros: simple, zero ceremony. - Cons: no validation; easy to mistype keys; evolving schemas can be brittle. - Use when: quick glue, small payloads, not persisting across runs. - Dataclass - Pros: typed, lightweight, picklable, easy to use. - Cons: no built-in validation; if you persist pickled results, class definition must remain stable across versions. - Use when: you want structure and type hints but don’t need validation or JSON interop. - Pydantic model - Pros: strong validation, easy JSON (de)serialization, great for result persistence, safer schema evolution. - Cons: a bit more overhead than dataclasses. - Use when: you want robust schemas, plan to persist results, or want JSON-friendly artifacts. Recommended pattern - For small/medium “control-plane” data: use a dataclass or Pydantic model. - For large “data-plane” objects: write to GCS and pass a small model that contains URIs + metadata. Examples 1) Minimal: pass a dict
Copy code
from prefect import flow, task

@task
def clean(d: dict) -> dict:
    d = {k: v.strip() if isinstance(v, str) else v for k, v in d.items()}
    return d

@task
def process(d: dict) -> dict:
    d["length"] = len(d)
    return d

@flow
def pipeline(data: dict):
    cleaned = clean(data)
    return process(cleaned)

pipeline({"name": " Alice ", "city": " Paris "})
2) Structured: pass a dataclass
Copy code
from dataclasses import dataclass
from typing import Optional
from prefect import flow, task

@dataclass
class Record:
    id: int
    name: str
    city: Optional[str] = None

@task
def clean(r: Record) -> Record:
    return Record(
        id=r.id,
        name=r.name.strip(),
        city=r.city.strip() if r.city else None,
    )

@task
def process(r: Record) -> dict:
    return {"id": r.id, "length": len(r.name)}

@flow
def pipeline(record: Record):
    return process(clean(record))

pipeline(Record(id=1, name=" Alice ", city=" Paris "))
3) Robust + persistence-friendly: Pydantic + JSON results
Copy code
from pydantic import BaseModel, Field
from prefect import flow, task
from prefect.serializers import JSONSerializer

class Record(BaseModel):
    id: int
    name: str = Field(min_length=1)
    city: str | None = None

@task(persist_result=True, result_serializer=JSONSerializer())
def clean(r: Record) -> Record:
    return Record(
        id=r.id,
        name=r.name.strip(),
        city=r.city.strip() if r.city else None,
    )

@task(persist_result=True, result_serializer=JSONSerializer())
def process(r: Record) -> dict:
    return {"id": r.id, "length": len(r.name)}

@flow
def pipeline(record: Record):
    return process(clean(record))

pipeline(Record(id=1, name=" Alice ", city=" Paris "))
Tip: If you set default result storage (e.g., a GCS block) on your deployment, persisted JSON results go to that storage automatically. 4) Big data: store in GCS and pass URIs ``` from dataclasses import dataclass from google.cloud import storage from prefect import flow, task @dataclass class ArtifactRef: bucket: str path: str # e.g. "raw/myfile.json" def _client(): return storage.Client() @task def load_from_gcs(ref: ArtifactRef) -> str: client = _client()
blob = client.bucket(ref.bucket).blob(ref.path) return blob.download_as_text() @task def write_to_gcs(bucket: str, path: str, data: str) -> ArtifactRef: client = _client() blob = client.bucket(bucket).blob(path) blob.upload_from_string(data) return ArtifactRef(bucket=bucket, path=path) @task def clean_text(data: str) -> str: return data.strip().lower() @flow def pipeline(input_ref: ArtifactRef, output_bucket: str, output_path: str) -> ArtifactRef: raw = load_from_gcs(input_ref) cleaned = clean_text(raw) return write_to_gcs(output_bucket, output_path, cleaned) # pipeline(ArtifactRef(bucket="my-bucket", path="raw/x.json"), "my-bucket", "clean/x.json") ``` This keeps task payloads small and avoids serializing large objects in-memory. Common pitfalls to avoid - Don’t pass open file handles, DB connections, or client objects between tasks. - Be mindful of size: large Python objects increase serialization time and memory. Prefer GCS for heavy data. - If you persist results, avoid pickling custom classes unless you control versioning; JSON via Pydantic is safer. If you share a bit more about: - Approximate size of your “data dictionary” - Whether your tasks run in a single process or on distributed workers (e.g., Kubernetes + Dask/Ray) - Whether you need persisted results across retries/resumptions …I can recommend a precise pattern and help sketch the deployment/storage setup for GCS.
f
@Marvin my object is < 10MB
m
thought for 92.7 seconds
Thanks—that size helps. Short answer - For objects under ~10 MB, it’s perfectly fine to pass them directly between Prefect 2 tasks. - Use a dataclass for simple structure and type hints, or a Pydantic model if you want validation and easy JSON persistence. - If you start fanning out across many tasks or remote workers and see overhead, pivot to storing the payload in GCS and passing a small reference (URI + metadata). How to choose - Dataclass: fastest to work with, typed, picklable. Great if you’re not persisting results and you control both ends. - Pydantic model: adds validation and easy JSON (de)serialization—ideal if you want to persist results or keep a stable schema over time. If you plan to persist results (retries/resumes or long-lived artifacts), prefer Pydantic + JSON and a GCS result store. Examples - Dataclass between tasks (no persistence):
Copy code
from dataclasses import dataclass
from prefect import flow, task

@dataclass
class Record:
    id: int
    payload: dict

@task
def clean(r: Record) -> Record:
    r.payload = {k: v for k, v in r.payload.items() if v is not None}
    return r

@task
def process(r: Record) -> dict:
    return {"id": r.id, "num_keys": len(r.payload)}

@flow
def pipeline(r: Record):
    return process(clean(r))

# pipeline(Record(id=1, payload={"a": 1, "b": None}))
- Pydantic + JSON persistence (good for Prefect Cloud + GCS):
Copy code
from pydantic import BaseModel
from prefect import flow, task
from prefect.serializers import JSONSerializer
from prefect_gcp.cloud_storage import GcsBucket

# Create and configure this block in the UI once, then load here
gcs = GcsBucket.load("my-gcs-results")

class Record(BaseModel):
    id: int
    payload: dict

@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def clean(r: Record) -> Record:
    return Record(id=r.id, payload={k: v for k, v in r.payload.items() if v is not None})

@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def process(r: Record) -> dict:
    return {"id": r.id, "num_keys": len(r.payload)}

@flow
def pipeline(r: Record):
    return process(clean(r))
- If you later need to offload to GCS and pass only references:
Copy code
from dataclasses import dataclass
from google.cloud import storage
from prefect import flow, task

@dataclass
class ArtifactRef:
    bucket: str
    path: str

@task
def write_to_gcs(bucket: str, path: str, data: bytes) -> ArtifactRef:
    client = storage.Client()
    client.bucket(bucket).blob(path).upload_from_string(data)
    return ArtifactRef(bucket, path)

@task
def read_from_gcs(ref: ArtifactRef) -> bytes:
    client = storage.Client()
    return client.bucket(ref.bucket).blob(ref.path).download_as_bytes()

# Pass ArtifactRef between tasks; size stays tiny even if payload is large
Notes and gotchas - Don’t pass open clients/sockets between tasks; re-create per task. - For distributed runners (e.g., Dask), 10 MB is fine, but each task edge will serialize/transfer it—watch for fan-out/fan-in patterns. - If you persist results, JSON via Pydantic is safer than pickled custom classes across code changes. Docs and references - Results and persistence in Prefect 2: Results (Prefect docs) - GCS block and usage: prefect-gcp: Cloud Storage If you tell me whether you’re running tasks locally vs. on Dask/Ray/Kubernetes and whether you want persisted results, I can tailor this to your exact setup.
f
@Marvin how can I deal with PrefectFuture and .results() in case of complex dataclass?
m
thought for 113.2 seconds
Great question. A few key points for Prefect 2: - Use
.result()
(singular) on a
PrefectFuture
to get the Python object back. If your task returned a dataclass, you’ll get that dataclass instance (assuming it was serializable). - For nested containers of futures (dicts/lists of futures), use
prefect.futures.resolve_futures_to_results(...)
to resolve everything in one shot. - If you persist results and use pickle, your dataclass is reconstructed automatically (as long as the class is top-level importable and code versions match). If you use JSON, you’ll get a dict back unless you explicitly reconstruct the dataclass. Quick examples 1) Basic: dataclass + future.result()
Copy code
from dataclasses import dataclass
from prefect import task, flow

@dataclass
class Record:
    id: int
    payload: dict

@task
def make_record() -> Record:
    return Record(id=1, payload={"a": 1})

@flow
def run():
    fut = make_record.submit()
    rec = fut.result()  # Record(id=1, payload={'a': 1})
    return rec
2) Nested futures: resolve in one call
Copy code
from prefect.futures import resolve_futures_to_results

@flow
def run_many():
    futures = {
        "first": make_record.submit(),
        "batch": [make_record.submit() for _ in range(3)],
    }
    resolved = resolve_futures_to_results(futures)
    # resolved["first"] is a Record
    # resolved["batch"] is a list[Record]
    return resolved
3) Persisting results with pickle (reconstructs dataclass automatically)
Copy code
from prefect import task
from prefect_gcp.cloud_storage import GcsBucket
from prefect.serializers import PickleSerializer

gcs = GcsBucket.load("my-gcs-results")

@task(persist_result=True, result_storage=gcs, result_serializer=PickleSerializer())
def compute() -> Record:
    return Record(id=42, payload={"k": "v"})

@flow
def run():
    fut = compute.submit()
    rec = fut.result()  # Full Record object
4) Persisting with JSON (you must reconstruct the dataclass) - Option A: return dicts and reconstruct later
Copy code
from dataclasses import asdict
from prefect.serializers import JSONSerializer

@task(persist_result=True, result_storage=gcs, result_serializer=JSONSerializer())
def compute_dict() -> dict:
    rec = Record(id=42, payload={"k": "v"})
    return asdict(rec)

@task
def reconstruct(d: dict) -> Record:
    return Record(**d)

@flow
def run():
    fut = compute_dict.submit()
    d = fut.result()           # dict from JSON
    rec = reconstruct(d)       # back to Record
- Option B (recommended): use Pydantic models instead of dataclasses if you want JSON persistence with automatic reconstruction. Common pitfalls and tips - Always define your dataclass at module top-level (not inside a function) so it’s picklable. - If you see
.results()
in examples, that’s not a Prefect 2 method—use
.result()
. For containers of futures, use
resolve_futures_to_results(...)
. - For distributed runs or after retries where results are loaded from storage, make sure the class definition is available (same import path) on the worker. - If your dataclass contains non-serializable fields (e.g., a client), exclude them from the dataclass or mark them with defaults and recreate inside the task. Handy references - Results and persistence: Results (Prefect docs) - Futures utilities: prefect.futures API - GCS result storage: prefect-gcp: Cloud Storage If you share how you’re persisting results (pickle vs JSON) and whether you have nested futures, I can tailor the exact pattern (and help choose between dataclass vs Pydantic).
f
@Marvin I obtain this error:
Copy code
RuntimeWarning: coroutine 'resolve_futures_to_data' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
m
thought for 59.5 seconds
That warning means you called an async function without awaiting it. In Prefect 2, the function you want is synchronous and differently named. What likely happened - You used
resolve_futures_to_data
(Prefect 3.x API) in a Prefect 2 project. That function is async, so calling it without
await
yields “coroutine was never awaited”. - In Prefect 2, the equivalent is
resolve_futures_to_results
and it is synchronous. Fix - Use the Prefect 2 helper and don’t await it:
Copy code
from prefect.futures import resolve_futures_to_results

resolved = resolve_futures_to_results(my_container_of_futures)
- For a single future, use
.result()
(singular), not `.results()`:
Copy code
rec = future.result()
Minimal example that preserves your complex dataclass
Copy code
from dataclasses import dataclass
from prefect import task, flow
from prefect.futures import resolve_futures_to_results

@dataclass
class Record:
    id: int
    payload: dict

@task
def build(i: int) -> Record:
    return Record(i, {"k": i})

@flow
def main():
    futs = {"one": build.submit(1), "many": [build.submit(i) for i in range(2, 5)]}
    data = resolve_futures_to_results(futs)  # no await needed in Prefect 2
    # data["one"] is a Record; data["many"] is list[Record]
    return data
If you really are on Prefect 3.x, then do:
Copy code
from prefect.futures import resolve_futures_to_data

resolved = await resolve_futures_to_data(my_container_of_futures)
But since you said you’re on Prefect 2 (Cloud), the Prefect 2 version above is the right one. If the warning persists, please share the import and the line where you call the resolver so I can pinpoint it. Links: prefect.futures API
f
@Marvin from prefect.futures import resolve_futures_to_data exists in Prefect 2
m
thought for 63.2 seconds
You’re right that
resolve_futures_to_data
can exist in some Prefect 2 installs—but it’s an async coroutine. The “coroutine was never awaited” warning means it was called from a synchronous context without
await
. You have two safe options in Prefect 2: Option A (simplest): use the synchronous helper
Copy code
from prefect.futures import resolve_futures_to_results

resolved = resolve_futures_to_results(my_container_of_futures)
- Works in sync flows/tasks. - Returns your original Python objects (e.g., dataclass instances) just like calling
.result()
on each future. Option B: make the call from an async context and await it - Async flow:
Copy code
from prefect import flow
from prefect.futures import resolve_futures_to_data

@flow
async def main():
    futures = {...}
    resolved = await resolve_futures_to_data(futures)
    return resolved
- Async task:
Copy code
from prefect import task
from prefect.futures import resolve_futures_to_data

@task
async def gather_all(container):
    return await resolve_futures_to_data(container)
- If you’re outside of Prefect (plain script), you can do:
Copy code
import asyncio
resolved = asyncio.run(resolve_futures_to_data(my_container_of_futures))
Notes - For a single future, use
.result()
(singular), not
.results()
. - If you persist results with JSON, dataclasses will round-trip as dicts; reconstruct with
Record(**d)
or switch to Pydantic for automatic JSON (de)serialization. - If you’re in a sync
@flow
and don’t want to convert to async, stick with
resolve_futures_to_results
. Sanity check (optional)
Copy code
import prefect, inspect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "resolve_futures" in n])
Docs - Futures API: prefect.futures API If you share the exact snippet where you call the resolver (and whether it’s inside a flow/task), I can suggest the minimal change to remove the warning.
f
@Marvin resolve_futures_to_results doesn't exists in Prefect 2.20
m
thought for 85.7 seconds
Got it — on Prefect 2.20, you may not have
resolve_futures_to_results
. Here are reliable patterns that work today: What to use instead - Single future: use
.result()
(singular)
Copy code
val = fut.result()
- Resolve a container of futures (sync-safe, works in any 2.20 flow/task)
Copy code
from collections.abc import Mapping, Sequence
from prefect.futures import PrefectFuture

def resolve_all(obj):
    if isinstance(obj, PrefectFuture):
        return obj.result()
    if isinstance(obj, Mapping):
        return type(obj)({k: resolve_all(v) for k, v in obj.items()})
    if isinstance(obj, (list, tuple, set)):
        seq = [resolve_all(v) for v in obj]
        return tuple(seq) if isinstance(obj, tuple) else type(obj)(seq)
    return obj

# Usage:
resolved = resolve_all(my_container_of_futures)
- If you already have a flat list of futures and just want to wait, then collect:
Copy code
from prefect.futures import wait

wait(list_of_futures)  # ensure completion
results = [f.result() for f in list_of_futures]
- Stream results as they complete:
Copy code
from prefect.futures import as_completed

for f in as_completed(list_of_futures):
    item = f.result()
    ...
About resolve_futures_to_data - In some Prefect 2 builds,
resolve_futures_to_data
exists but is async. If you use it, you must await it from an async flow/task:
Copy code
from prefect import flow
from prefect.futures import resolve_futures_to_data

@flow
async def main():
    futures = {...}
    data = await resolve_futures_to_data(futures)
    return data
- In a synchronous flow, don’t call it directly (that’s what triggered “coroutine was never awaited”). Use the
resolve_all
helper above instead. Complex dataclasses - If your tasks return dataclass instances,
.result()
gives you the dataclass back (assuming you didn’t persist with JSON). If you persist with JSON, you’ll get a dict and should reconstruct the dataclass, e.g.,
Record(**d)
, or use Pydantic models for automatic JSON (de)serialization. Sanity check what’s available in your env
Copy code
import prefect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "resolve_futures" in n])
References - Futures utilities (API): prefect.futures API If you paste the snippet where you’re resolving your futures (and whether your flow is sync or async), I can give you the exact minimal change.
f
@Marvin is it a good practice to use .map in Prefect? Or should I prefer Python native loop?
m
thought for 44.0 seconds
Short answer: Prefer native Python loops/comprehensions with
.submit()
over
.map
in Prefect 2. Why - Prefect 2’s recommended fan-out pattern is explicit submission: - You get clear control over batching, backpressure, and error handling. - Works the same in sync or async flows and across task runners (threads, Dask, Ray). - The “mapped tasks” pattern from Prefect 1.x isn’t the primary API in 2.x. Even if you see a
.map
on some objects, it’s not the common/idiomatic approach in 2.x and can be confusing. Patterns to use - Fan-out/fan-in
Copy code
from prefect import flow, task

@task
def process(x: int) -> int:
    return x * 2

@flow
def main(items: list[int]):
    futs = [process.submit(x) for x in items]
    results = [f.result() for f in futs]
    return results
- Stream results as they complete
Copy code
from prefect.futures import as_completed

@flow
def main(items: list[int]):
    futs = [process.submit(x) for x in items]
    for f in as_completed(futs):
        r = f.result()
        # handle r (e.g., write to GCS) before the rest finish
- Batch to avoid thousands of tiny tasks
Copy code
def chunks(seq, n):
    for i in range(0, len(seq), n):
        yield seq[i:i+n]

@flow
def main(items: list[int], batch_size: int = 100):
    batch_futs = [process_batch.submit(batch) for batch in chunks(items, batch_size)]
    # gather or stream results from batch_futs
Tips - Use a task runner to get concurrency: - Threads (default ConcurrentTaskRunner) work well for I/O-bound tasks. - For more scale, use Dask/Ray task runners. See Task runners. - If you need to resolve nested containers of futures, either: - Call
.result()
on each future (as above), or - If you have
resolve_futures_to_data
in 2.20 and are in an async flow,
await
it; otherwise write a tiny resolver that recursively calls
.result()
. If you share your task runner (threads vs Dask/Ray) and the typical number of items you fan out, I can suggest a batching/concurrency strategy tuned to your workload.
f
@Marvin as_completed method doesn't exists in Prefect 2.20
m
thought for 53.6 seconds
You’re right — in some Prefect 2.20 environments
as_completed
isn’t available. Here are solid alternatives that work in 2.20 today. Option 1: Simple fan-out/fan-in (sync)
Copy code
futs = [task_fn.submit(x) for x in items]
# optionally ensure they’ve all finished
from prefect.futures import wait
wait(futs)

results = [f.result() for f in futs]
Option 2: Stream results as they finish (polyfill using wait)
Copy code
from time import sleep
from prefect.futures import wait, PrefectFuture

def iter_as_completed(futures, poll=0.5):
    pending = set(futures)
    while pending:
        done_and_not = wait(list(pending), timeout=poll)
        for f in done_and_not.done:
            yield f
        pending = done_and_not.not_done
        if pending and poll:
            sleep(poll)

# Usage
futs = [task_fn.submit(x) for x in items]
for f in iter_as_completed(futs):
    result = f.result()
    # handle result incrementally
Option 3: Resolve nested containers of futures (sync)
Copy code
from collections.abc import Mapping, Sequence
from prefect.futures import PrefectFuture

def resolve_all(obj):
    if isinstance(obj, PrefectFuture):
        return obj.result()
    if isinstance(obj, Mapping):
        return type(obj)({k: resolve_all(v) for k, v in obj.items()})
    if isinstance(obj, (list, tuple, set)):
        seq = [resolve_all(v) for v in obj]
        return tuple(seq) if isinstance(obj, tuple) else type(obj)(seq)
    return obj

# resolved = resolve_all({"a": fut_a, "b": [fut_b1, fut_b2]})
Async flows (if you have resolve_futures_to_data) - If
resolve_futures_to_data
is present in your 2.20 install, it’s async — you must
await
it from an async `@flow`/`@task`:
Copy code
from prefect import flow
from prefect.futures import resolve_futures_to_data

@flow
async def main():
    futures = {...}
    data = await resolve_futures_to_data(futures)
    return data
Sanity check in your env
Copy code
import prefect, prefect.futures as pf
print(prefect.__version__)
print([n for n in dir(pf) if "as_completed" in n or "resolve_futures" in n])
Docs: prefect.futures API If you share a short snippet of how you’re submitting and collecting, I can tailor the minimal changes for your exact pattern.