<@ULVA73B9P> ``` with tags('video-ingestion'):...
# ask-marvin
t
@Marvin
Copy code
with tags('video-ingestion'):
        for vid in vids:
            future = ingest_one_video.submit(vid)
            processing.add(future)

            if len(processing) > concurrency_lim:
                for done in as_completed(list(processing)):
                    processing.remove(done)
                    result = done.result()  # Get the actual result
                    if result is not None:  # Only add non-None results
                        results.append(result)
                    completed += 1
                    break
        
        for done in as_completed(list(processing)):
            processing.remove(done)
            result = done.result()  # Get the actual result
            if result is not None:  # Only add non-None results
                results.append(result)
            completed += 1
How do I make it such that when one of the task failed/crashed, we just skip it instead of making the entire flow crash?
m
thought for 242.7 seconds
t
Human 1 Agentic AI 0
n
my child has not outstripped me yet
t
W nate
m
Great question. In Prefect 3.x, calling
future.result()
on a failed/crashed task raises an exception and will bubble up unless you catch it. To “skip” failed tasks and keep the flow running, either: A) Catch exceptions around
result()
, or B) Don’t raise in the first place and check the task’s state Here are two concise patterns you can use. 1) Easiest: try/except around result()
Copy code
from prefect import get_run_logger
from prefect.futures import as_completed

logger = get_run_logger()

with tags('video-ingestion'):
    for vid in vids:
        future = ingest_one_video.submit(vid)
        processing.add(future)

        if len(processing) > concurrency_lim:
            for done in as_completed(list(processing)):
                processing.remove(done)
                try:
                    result = done.result()  # may raise if the task failed
                except Exception as exc:
                    logger.warning(f"Skipping failed/crashed task: {exc}")
                    completed += 1
                    break
                else:
                    if result is not None:
                        results.append(result)
                    completed += 1
                    break

    for done in as_completed(list(processing)):
        processing.remove(done)
        try:
            result = done.result()
        except Exception as exc:
            logger.warning(f"Skipping failed/crashed task: {exc}")
        else:
            if result is not None:
                results.append(result)
        completed += 1
2) Don’t raise; check state first
Copy code
from prefect import get_run_logger
from prefect.futures import as_completed
from prefect.schemas import StateType

logger = get_run_logger()

with tags('video-ingestion'):
    for vid in vids:
        future = ingest_one_video.submit(vid)
        processing.add(future)

        if len(processing) > concurrency_lim:
            for done in as_completed(list(processing)):
                processing.remove(done)
                if done.state.type in {StateType.FAILED, StateType.CRASHED, StateType.CANCELLED}:
                    logger.warning(f"Skipping task due to state={done.state.type}: {done.state.message}")
                    completed += 1
                    break
                result = done.result()  # safe because state is completed
                if result is not None:
                    results.append(result)
                completed += 1
                break

    for done in as_completed(list(processing)):
        processing.remove(done)
        if done.state.type in {StateType.FAILED, StateType.CRASHED, StateType.CANCELLED}:
            logger.warning(f"Skipping task due to state={done.state.type}: {done.state.message}")
        else:
            result = done.result()
            if result is not None:
                results.append(result)
        completed += 1
Notes: - If you prefer not to raise at all, you can also do
done.result(raise_on_failure=False)
and then branch on
done.state
as shown above. - Your flow will stay “Completed” as long as you don’t let exceptions escape the flow function; failed tasks will still be recorded as failed task runs. - Consider adding retries to the task to reduce transient failures, e.g.
@task(retries=3, retry_delay_seconds=10)
. Reference: Prefect futures/state behavior is implemented in the library; see source for result semantics and state handling in Prefect source.
t
@Nate nit: seems like marvin is a bit confused on the import types? Feels like it should be either:
Copy code
from prefect.states import StateType
or
Copy code
from prefect.client.schemas import StateType
Maybe some additional prompt engineering would help?
n
yea it has a tool to check imports, sometimes it remembers to use it, sometimes it doesn't feel free to open a PR if you have a suggestion on the prompt!
❤️ 1