<@ULVA73B9P> How can i fix this? 12:44:50.174 | WA...
# ask-marvin
f
@Marvin How can i fix this? 124450.174 | WARNING | prefect.tasks - Both
cache_policy
and
cache_key_fn
are set on task <prefect.tasks.Task object at 0x7f336981e010>.
cache_key_fn
will be used.
m
thought for 200.7 seconds
That warning means you’ve set both
cache_policy
and
cache_key_fn
on the same task. Prefect 3.x will ignore the policy and use your
cache_key_fn
. To fix it, pick one approach and remove the other. Choose one of these: - If you want built-in caching behavior (inputs, task source, etc.), use only `cache_policy`:
Copy code
from datetime import timedelta
from prefect import task
from prefect.cache_policies import INPUTS, NO_CACHE, DEFAULT

@task(
    cache_policy=INPUTS,         # or DEFAULT, NO_CACHE, etc.
    cache_expiration=timedelta(days=1),
)
def my_task(x, y):
    ...
- If you want fully custom cache keys, use only `cache_key_fn`:
Copy code
from datetime import timedelta
from prefect import task

def my_key_fn(ctx, params):
    # ctx: TaskRunContext; params: dict of task arguments
    return f"my_task:{params['x']}:{params['y']}"

@task(
    cache_key_fn=my_key_fn,
    cache_expiration=timedelta(days=1),
)
def my_task(x, y):
    ...
Notes: - It’s safe to keep
cache_expiration
with either approach. - If you don’t want caching at all, set
cache_policy=NO_CACHE
. - If you intentionally prefer the custom function, remove
cache_policy
to silence the warning; if you prefer the policy, remove
cache_key_fn
. Docs for more details: - Concepts: Task caching in Prefect 3 - How-to: Cache workflow steps - API: prefect.cache_policies If you share your task decorator line, I can suggest the exact change to remove the warning.
f
@Marvin I am running multiple tasks with submit, and then after the for loop calling
results = resolve_futures_to_results(futures)
I expect this to resolve all the futures to a result, and raise any exceptions in the process. When using this, I see tasks that are still pending to be "crashed" in state.
m
thought for 229.8 seconds
Short answer: if one future fails,
resolve_futures_to_results
will raise, your flow function raises, and the task runner tears down. Any still-running tasks get interrupted and end as Crashed. That’s why you see pending tasks become Crashed. If you want “let all tasks finish, then surface errors,” don’t use
resolve_futures_to_results
directly. Instead, wait for everything and collect results/errors yourself. Try one of these patterns: - Wait for all, then raise an aggregated error
Copy code
from prefect.futures import wait

futures = [my_task.submit(x) for x in items]

# Block until all futures reach a terminal state (Completed/Failed/Crashed/Cancelled)
wait(futures)

results = []
errors = []
for fut in futures:
    try:
        # Raise the task's underlying exception if it failed
        results.append(fut.result(raise_on_failure=True))
    except Exception as exc:
        errors.append((fut, exc))

if errors:
    # Raise once you've allowed all work to finish
    raise Exception(f"{len(errors)} task(s) failed", errors)

# use results if no errors
- Handle tasks as they complete (good for streaming progress)
Copy code
from prefect.futures import as_completed

futures = [my_task.submit(x) for x in items]

results = []
errors = []
for fut in as_completed(futures):
    try:
        results.append(fut.result(raise_on_failure=True))
    except Exception as exc:
        errors.append((fut, exc))

if errors:
    raise Exception(f"{len(errors)} task(s) failed", errors)
- Inspect states rather than raising immediately
Copy code
from prefect.futures import resolve_futures_to_states

futures = [my_task.submit(x) for x in items]
states = [f for f in resolve_futures_to_states(futures)]  # mirrors your structure

failed = [s for s in states if s.is_failed()]
crashed = [s for s in states if s.is_crashed()]

if failed or crashed:
    # optionally extract messages/exceptions and raise once
    raise Exception(f"{len(failed)} failed, {len(crashed)} crashed")
Why “Crashed” appears: - “Failed” means your task code raised an exception. - “Crashed” means it was interrupted by infrastructure (e.g., the flow raised early, task runner torn down, process killed, timeout, OOM, SIGTERM). When
resolve_futures_to_results
raises early, that interruption can crash the rest. If you share: - your task runner (ThreadPool vs ProcessPool vs Dask/Ray), - a short snippet of your loop + call to
resolve_futures_to_results
, - any timeouts/retries set, I can tailor the exact pattern. Docs: - Futures API: Futures - States: Prefect states - Concurrency patterns: Run work concurrently
f
@Marvin ok, using the as_completed function works, but it is a lot of boileplate for something that I expect to work. Are there no wrappers that can just take care of this. Since 3.0 we must raise exceptions to fail tasks, so this is a pretty standard pattern
m
thought for 178.7 seconds
I hear you—boilerplate isn’t fun. There isn’t a single built-in “wait for all then raise aggregated errors” wrapper in Prefect 3.x.
resolve_futures_to_results
is designed to raise as soon as it encounters a failed future, which can interrupt remaining tasks (leading to Crashed states). Here are the lowest-boilerplate patterns that avoid early teardown: - If you can use mapping:
Copy code
# fan-out
futures = my_task.map(items)

# wait for all and collect results/exceptions in order
results = futures.result(raise_on_failure=False)

# aggregate errors after everything finishes
errors = [(i, r) for i, r in enumerate(results) if isinstance(r, BaseException)]
if errors:
    raise Exception(f"{len(errors)} tasks failed", errors)
- If you’re using submit in a loop:
Copy code
from prefect.futures import wait

futures = [my_task.submit(x) for x in items]

# ensure all finish; avoids crashing the rest
wait(futures)

# collect without raising during iteration
results = [f.result(raise_on_failure=False) for f in futures]
errors = [(i, r) for i, r in enumerate(results) if isinstance(r, BaseException)]
if errors:
    raise Exception(f"{len(errors)} tasks failed", errors)
- If you prefer states instead of exceptions:
Copy code
from prefect.futures import resolve_futures_to_states

futures = [my_task.submit(x) for x in items]
states = resolve_futures_to_states(futures)

failed = [s for s in states if s.is_failed()]
crashed = [s for s in states if s.is_crashed()]

if failed or crashed:
    raise Exception(f"{len(failed)} failed, {len(crashed)} crashed")
If you like, you can wrap any of the above into a tiny helper once and reuse it. References: - Futures API: Futures - Concurrency patterns: Run work concurrently - States: States If you share a short snippet of your flow, I can suggest the smallest drop-in helper to match your pattern.
f
@Marvin thanks, I will live with it. Can you help me with understanding how I can create safe code for this
Copy code
@materialize(_asset, task_run_name=f"{task_run.name}-materialize", by="world")
        def _materialize(metadata: dict, deferred_exception: Exception | None = None):
  
          _asset.add_metadata(metadata)
If the metadata dict contains an Exception in one of the values, it will silently Error out with
Copy code
13:25:18.872 | ERROR   | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item Event(occurred=DateTime(2025, 10, 20, 13, 25, 18, 820738, tzinfo=Timezone('UTC')), event='prefect.asset.materialization.failed', resource=Resource(root={'prefect.resource.id': '<joy://kill/dave>', 'prefect.resource.name': 'dave', 'prefect.asset.url': '<https://youtu.be/dQw4w9WgXcQ'}>), related=[RelatedResource(root={'prefect.resource.id': 'world', 'prefect.resource.role': 'asset-materialized-by'}), RelatedResource(root={'prefect.resource.id': 'prefect.flow-run.068f6383-d765-7810-8000-1fdd48848bda', 'prefect.resource.role': 'flow-run', 'prefect.resource.name': 'voracious-muskox'}), RelatedResource(root={'prefect.resource.id': 'prefect.task-run.019a01cb-b558-7000-9792-922b89af2592', 'prefect.resource.role': 'task-run', 'prefect.resource.name': 'dave-a_kill_joy_for_dave-0-materialize'}), RelatedResource(root={'prefect.resource.id': 'prefect.flow.bc2e0553-dde6-4aad-81ce-a9251df25737', 'prefect.resource.role': 'flow', 'prefect.resource.name': 'flow-joy'})], payload={'task': {'name': 'dave-a_kill_joy_for_dave-0', 'id': UUID('019a01cb-b329-7d42-a621-cd94108f2ac4')}, 'state': {'type': 'StateType.FAILED'}, 'exception': RuntimeError('Handled fault: victim was killed as ordered')}, id=UUID('019a01cb-b564-72e6-b97f-11325a4b4b09'), follows=None)
and the task completes with Completed(), but the asset has not been updated
m
thought for 246.0 seconds
You’re running into metadata serialization. Asset materialization metadata must be JSON-serializable. If you pass an Exception object (or any non-serializable value) in the metadata dict, the background EventsWorker fails to serialize the event, logs an error, and the asset update is dropped. That failure does not propagate to your task, so your task still shows Completed. Two parts to make this safe: 1) Sanitize metadata before adding it 2) Optionally, fail the task if metadata can’t be made serializable Here’s a small, reusable sanitizer that recursively converts common non-JSON types (Exceptions, datetime, UUID, sets, bytes, etc.) into safe forms.
Copy code
import datetime as dt
import traceback
import uuid
from typing import Any

def to_jsonable(value: Any) -> Any:
    # primitives are fine
    if value is None or isinstance(value, (str, int, float, bool)):
        return value

    # exceptions -> structured dict
    if isinstance(value, BaseException):
        return {
            "type": type(value).__name__,
            "message": str(value),
            "traceback": "".join(
                traceback.format_exception(type(value), value, value.__traceback__)
            ) if value.__traceback__ else None,
        }

    # containers
    if isinstance(value, dict):
        return {str(k): to_jsonable(v) for k, v in value.items()}
    if isinstance(value, (list, tuple)):
        return [to_jsonable(v) for v in value]
    if isinstance(value, set):
        return [to_jsonable(v) for v in value]

    # common rich types
    if isinstance(value, (dt.datetime, dt.date)):
        return value.isoformat()
    if isinstance(value, uuid.UUID):
        return str(value)
    if isinstance(value, bytes):
        # choose your policy: decode or base64
        return value.decode("utf-8", errors="replace")

    # final fallback: repr
    return repr(value)
Use it in your materialize task and, if you want, fail the task on bad input:
Copy code
from prefect.assets import materialize

@materialize(_asset, task_run_name="{task_run.name}-materialize", by="world")
def _materialize(metadata: dict, deferred_exception: Exception | None = None):
    # sanitize user-provided metadata
    safe_metadata = to_jsonable(metadata)

    # if you want to include the deferred exception, convert it too
    if deferred_exception is not None:
        safe_metadata["deferred_exception"] = to_jsonable(deferred_exception)

    # Optionally pre-validate by attempting a strict JSON round-trip
    # (this catches any missed types before the EventsWorker runs)
    import json
    json.dumps(safe_metadata)  # will raise TypeError if still not serializable

    # Add metadata; event emission happens after task returns
    _asset.add_metadata(safe_metadata)
Notes: - Event emission is asynchronous; if serialization fails there, you’ll see a log like the one you posted and the asset won’t update, but your task won’t fail. Pre-sanitizing (and optional json.dumps check) avoids this. - Keep metadata small and JSON-friendly: strings, numbers, booleans, lists, dicts. Convert complex objects to strings or structured dicts. - If you want the flow to reflect a failure when metadata is invalid, keep the json.dumps check and let it raise; that will fail the task and the materialization will be recorded as failed. Refs: - Assets: Assets in Prefect 3 - Events: Events If you share an example of the metadata you’re passing, I can tune the sanitizer for your exact shapes.