Fredrik Hoem Grelland
10/20/2025, 12:46 PMcache_policy and cache_key_fn are set on task <prefect.tasks.Task object at 0x7f336981e010>. cache_key_fn will be used.Marvin
10/20/2025, 12:46 PMMarvin
10/20/2025, 12:49 PMcache_policy and cache_key_fn on the same task. Prefect 3.x will ignore the policy and use your cache_key_fn. To fix it, pick one approach and remove the other.
Choose one of these:
- If you want built-in caching behavior (inputs, task source, etc.), use only `cache_policy`:
from datetime import timedelta
from prefect import task
from prefect.cache_policies import INPUTS, NO_CACHE, DEFAULT
@task(
    cache_policy=INPUTS,         # or DEFAULT, NO_CACHE, etc.
    cache_expiration=timedelta(days=1),
)
def my_task(x, y):
    ...
- If you want fully custom cache keys, use only `cache_key_fn`:
from datetime import timedelta
from prefect import task
def my_key_fn(ctx, params):
    # ctx: TaskRunContext; params: dict of task arguments
    return f"my_task:{params['x']}:{params['y']}"
@task(
    cache_key_fn=my_key_fn,
    cache_expiration=timedelta(days=1),
)
def my_task(x, y):
    ...
Notes:
- It’s safe to keep cache_expiration with either approach.
- If you don’t want caching at all, set cache_policy=NO_CACHE.
- If you intentionally prefer the custom function, remove cache_policy to silence the warning; if you prefer the policy, remove cache_key_fn.
Docs for more details:
- Concepts: Task caching in Prefect 3
- How-to: Cache workflow steps
- API: prefect.cache_policies
If you share your task decorator line, I can suggest the exact change to remove the warning.Fredrik Hoem Grelland
10/20/2025, 1:05 PMresults = resolve_futures_to_results(futures)
I expect this to resolve all the futures to  a result, and raise any exceptions in the process.
When using this, I see tasks that are still pending to be "crashed" in state.Marvin
10/20/2025, 1:05 PMMarvin
10/20/2025, 1:08 PMresolve_futures_to_results will raise, your flow function raises, and the task runner tears down. Any still-running tasks get interrupted and end as Crashed. That’s why you see pending tasks become Crashed.
If you want “let all tasks finish, then surface errors,” don’t use resolve_futures_to_results directly. Instead, wait for everything and collect results/errors yourself.
Try one of these patterns:
- Wait for all, then raise an aggregated error
from prefect.futures import wait
futures = [my_task.submit(x) for x in items]
# Block until all futures reach a terminal state (Completed/Failed/Crashed/Cancelled)
wait(futures)
results = []
errors = []
for fut in futures:
    try:
        # Raise the task's underlying exception if it failed
        results.append(fut.result(raise_on_failure=True))
    except Exception as exc:
        errors.append((fut, exc))
if errors:
    # Raise once you've allowed all work to finish
    raise Exception(f"{len(errors)} task(s) failed", errors)
# use results if no errors
- Handle tasks as they complete (good for streaming progress)
from prefect.futures import as_completed
futures = [my_task.submit(x) for x in items]
results = []
errors = []
for fut in as_completed(futures):
    try:
        results.append(fut.result(raise_on_failure=True))
    except Exception as exc:
        errors.append((fut, exc))
if errors:
    raise Exception(f"{len(errors)} task(s) failed", errors)
- Inspect states rather than raising immediately
from prefect.futures import resolve_futures_to_states
futures = [my_task.submit(x) for x in items]
states = [f for f in resolve_futures_to_states(futures)]  # mirrors your structure
failed = [s for s in states if s.is_failed()]
crashed = [s for s in states if s.is_crashed()]
if failed or crashed:
    # optionally extract messages/exceptions and raise once
    raise Exception(f"{len(failed)} failed, {len(crashed)} crashed")
Why “Crashed” appears:
- “Failed” means your task code raised an exception.
- “Crashed” means it was interrupted by infrastructure (e.g., the flow raised early, task runner torn down, process killed, timeout, OOM, SIGTERM). When resolve_futures_to_results raises early, that interruption can crash the rest.
If you share:
- your task runner (ThreadPool vs ProcessPool vs Dask/Ray),
- a short snippet of your loop + call to resolve_futures_to_results,
- any timeouts/retries set,
I can tailor the exact pattern. Docs:
- Futures API: Futures
- States: Prefect states
- Concurrency patterns: Run work concurrentlyFredrik Hoem Grelland
10/20/2025, 1:16 PMMarvin
10/20/2025, 1:16 PMMarvin
10/20/2025, 1:19 PMresolve_futures_to_results is designed to raise as soon as it encounters a failed future, which can interrupt remaining tasks (leading to Crashed states).
Here are the lowest-boilerplate patterns that avoid early teardown:
- If you can use mapping:
# fan-out
futures = my_task.map(items)
# wait for all and collect results/exceptions in order
results = futures.result(raise_on_failure=False)
# aggregate errors after everything finishes
errors = [(i, r) for i, r in enumerate(results) if isinstance(r, BaseException)]
if errors:
    raise Exception(f"{len(errors)} tasks failed", errors)
- If you’re using submit in a loop:
from prefect.futures import wait
futures = [my_task.submit(x) for x in items]
# ensure all finish; avoids crashing the rest
wait(futures)
# collect without raising during iteration
results = [f.result(raise_on_failure=False) for f in futures]
errors = [(i, r) for i, r in enumerate(results) if isinstance(r, BaseException)]
if errors:
    raise Exception(f"{len(errors)} tasks failed", errors)
- If you prefer states instead of exceptions:
from prefect.futures import resolve_futures_to_states
futures = [my_task.submit(x) for x in items]
states = resolve_futures_to_states(futures)
failed = [s for s in states if s.is_failed()]
crashed = [s for s in states if s.is_crashed()]
if failed or crashed:
    raise Exception(f"{len(failed)} failed, {len(crashed)} crashed")
If you like, you can wrap any of the above into a tiny helper once and reuse it.
References:
- Futures API: Futures
- Concurrency patterns: Run work concurrently
- States: States
If you share a short snippet of your flow, I can suggest the smallest drop-in helper to match your pattern.Fredrik Hoem Grelland
10/20/2025, 1:26 PM@materialize(_asset, task_run_name=f"{task_run.name}-materialize", by="world")
        def _materialize(metadata: dict, deferred_exception: Exception | None = None):
  
          _asset.add_metadata(metadata)
If the metadata dict contains an Exception in one of the values, it will silently Error out with
13:25:18.872 | ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item Event(occurred=DateTime(2025, 10, 20, 13, 25, 18, 820738, tzinfo=Timezone('UTC')), event='prefect.asset.materialization.failed', resource=Resource(root={'prefect.resource.id': '<joy://kill/dave>', 'prefect.resource.name': 'dave', 'prefect.asset.url': '<https://youtu.be/dQw4w9WgXcQ'}>), related=[RelatedResource(root={'prefect.resource.id': 'world', 'prefect.resource.role': 'asset-materialized-by'}), RelatedResource(root={'prefect.resource.id': 'prefect.flow-run.068f6383-d765-7810-8000-1fdd48848bda', 'prefect.resource.role': 'flow-run', 'prefect.resource.name': 'voracious-muskox'}), RelatedResource(root={'prefect.resource.id': 'prefect.task-run.019a01cb-b558-7000-9792-922b89af2592', 'prefect.resource.role': 'task-run', 'prefect.resource.name': 'dave-a_kill_joy_for_dave-0-materialize'}), RelatedResource(root={'prefect.resource.id': 'prefect.flow.bc2e0553-dde6-4aad-81ce-a9251df25737', 'prefect.resource.role': 'flow', 'prefect.resource.name': 'flow-joy'})], payload={'task': {'name': 'dave-a_kill_joy_for_dave-0', 'id': UUID('019a01cb-b329-7d42-a621-cd94108f2ac4')}, 'state': {'type': 'StateType.FAILED'}, 'exception': RuntimeError('Handled fault: victim was killed as ordered')}, id=UUID('019a01cb-b564-72e6-b97f-11325a4b4b09'), follows=None)
and the task completes with Completed(), but the asset has not been updatedMarvin
10/20/2025, 1:26 PMMarvin
10/20/2025, 1:30 PMimport datetime as dt
import traceback
import uuid
from typing import Any
def to_jsonable(value: Any) -> Any:
    # primitives are fine
    if value is None or isinstance(value, (str, int, float, bool)):
        return value
    # exceptions -> structured dict
    if isinstance(value, BaseException):
        return {
            "type": type(value).__name__,
            "message": str(value),
            "traceback": "".join(
                traceback.format_exception(type(value), value, value.__traceback__)
            ) if value.__traceback__ else None,
        }
    # containers
    if isinstance(value, dict):
        return {str(k): to_jsonable(v) for k, v in value.items()}
    if isinstance(value, (list, tuple)):
        return [to_jsonable(v) for v in value]
    if isinstance(value, set):
        return [to_jsonable(v) for v in value]
    # common rich types
    if isinstance(value, (dt.datetime, dt.date)):
        return value.isoformat()
    if isinstance(value, uuid.UUID):
        return str(value)
    if isinstance(value, bytes):
        # choose your policy: decode or base64
        return value.decode("utf-8", errors="replace")
    # final fallback: repr
    return repr(value)
Use it in your materialize task and, if you want, fail the task on bad input:
from prefect.assets import materialize
@materialize(_asset, task_run_name="{task_run.name}-materialize", by="world")
def _materialize(metadata: dict, deferred_exception: Exception | None = None):
    # sanitize user-provided metadata
    safe_metadata = to_jsonable(metadata)
    # if you want to include the deferred exception, convert it too
    if deferred_exception is not None:
        safe_metadata["deferred_exception"] = to_jsonable(deferred_exception)
    # Optionally pre-validate by attempting a strict JSON round-trip
    # (this catches any missed types before the EventsWorker runs)
    import json
    json.dumps(safe_metadata)  # will raise TypeError if still not serializable
    # Add metadata; event emission happens after task returns
    _asset.add_metadata(safe_metadata)
Notes:
- Event emission is asynchronous; if serialization fails there, you’ll see a log like the one you posted and the asset won’t update, but your task won’t fail. Pre-sanitizing (and optional json.dumps check) avoids this.
- Keep metadata small and JSON-friendly: strings, numbers, booleans, lists, dicts. Convert complex objects to strings or structured dicts.
- If you want the flow to reflect a failure when metadata is invalid, keep the json.dumps check and let it raise; that will fail the task and the materialization will be recorded as failed.
Refs:
- Assets: Assets in Prefect 3
- Events: Events
If you share an example of the metadata you’re passing, I can tune the sanitizer for your exact shapes.