<@ULVA73B9P> does perfect support re-run of task t...
# ask-marvin
s
@Marvin does perfect support re-run of task that failed if it fails in workflow
m
thought for 167.7 seconds
Yes—assuming you mean Prefect—Prefect supports automatically re-running (retrying) tasks and flows that fail. In Prefect 3.x: - You can set retries and delays on both tasks and flows. - Delays can be fixed, a list of intervals, or generated via a function (e.g., exponential backoff). - You can optionally retry only on certain errors with a retry condition. Examples - Fixed delay retries on a task:
Copy code
from prefect import flow, task

@task(retries=3, retry_delay_seconds=5)
def flaky_task():
    # raise an exception to trigger a retry
    ...
- Exponential backoff with optional jitter:
Copy code
from prefect import flow, task
from prefect.tasks import exponential_backoff

@task(
    retries=5,
    retry_delay_seconds=exponential_backoff(backoff_factor=2),  # 1, 2, 4, 8, 16 (seconds)
    retry_jitter_factor=0.2,  # add up to ±20% jitter to each delay
)
def network_call():
    ...
- Flow-level retries (retries the entire flow run if it fails):
Copy code
from prefect import flow

@flow(retries=2, retry_delay_seconds=10)
def my_flow():
    ...
- Conditional retries (only retry for certain exceptions):
Copy code
from prefect import task

def retry_on_timeouts(exc):
    # return True to retry, False to fail immediately
    return isinstance(exc, TimeoutError)

@task(retries=3, retry_delay_seconds=5, retry_condition_fn=retry_on_timeouts)
def sometimes_times_out():
    ...
A couple notes: - Retries happen within the same flow run. - If you want to “resume” a failed run by only re-executing failed work, consider enabling result persistence and caching so previously successful tasks can be skipped when you re-run the flow. Docs for more detail: - Task API (retries, backoff) - Prefect blog: scaling and resilience patterns Quick clarification: are you looking for automatic retries during the same run (as above), or a way to re-run only the failed task after the run has already finished?
s
@Marvin my workflow fails at task and on next day if I have to re-run from same failed task
m
thought for 202.9 seconds
Got it—you want to re-run the workflow the next day and pick up from the task that failed, without re-running tasks that already succeeded. In Prefect 3.x there isn’t a “resume from failed task” button, but you can get the same behavior by enabling result persistence and caching on your tasks. Then, when you re-run the flow with the same inputs, previously successful tasks will enter a Cached state and be skipped; only the failed/missing tasks will run. How to set it up 1) Persist task results and enable input-based caching:
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(
    persist_result=True,                 # write results to storage
    cache_key_fn=task_input_hash,        # reuse results when inputs match
    cache_expiration=timedelta(days=7),  # keep results available across days
)
def heavy_step(x):
    ...

@flow
def my_flow(x):
    a = heavy_step(x)
    b = heavy_step(x + 1)  # example of another cached step
    ...
2) Configure where results are stored (so they’re available across runs). For example, use a remote bucket:
Copy code
from prefect import flow
from prefect.filesystems import RemoteFileSystem

storage = RemoteFileSystem(basepath="<s3://my-bucket/prefect-results>")

@flow(result_storage=storage)
def my_flow(x):
    ...
3) Re-run the flow with the same parameters the next day (UI or CLI). Successful tasks will be skipped due to cached results; the failed task(s) will execute. - CLI example (verified):
Copy code
prefect deployment run my-flow/prod -p x=1 --watch
Notes - Retries (
retries=...
) are for automatic re-attempts within the same flow run. What you’re asking for is a new run that only re-executes failed work; caching + persisted results is the pattern for that. - If you map over many items, input-hash caching ensures only the items that failed (or are new) will re-run on the next execution. - Choose a
cache_expiration
that exceeds your typical “resume” window. Docs - Caching and persisted results - Task API (persist_result, cache_key_fn, cache_expiration) Want me to help apply this to your specific flow? If you can share how the failing task is called and where you’d like to store results (local, S3, GCS, etc.), I can sketch the exact code.
s
@Marvin other platforms provide human in the loop features like waiting for user approval or input. What you think about this implementation. Other platforms can keep hundreds of flows in waiting state without consuming worker resources
m
thought for 117.1 seconds
You’re describing “human-in-the-loop” approvals and long waits without tying up compute. Prefect 3.x supports this pattern natively, and waiting runs do not consume worker resources. How it works in Prefect - A flow can pause/suspend itself and optionally request typed input. While paused/suspended, no worker slot or process is held; the run sits in the Prefect API in a Paused/Suspended state until resumed. - When input is provided (via UI or API), the run is resumed and scheduled for a worker to pick up and continue. A simple approval gate
Copy code
from pydantic import BaseModel
from prefect import flow
from prefect.flow_runs import suspend_flow_run

class Approval(BaseModel):
    approved: bool
    note: str | None = None

@flow
def my_flow():
    # ... do some work
    approval = suspend_flow_run(
        wait_for_input=Approval,  # UI/API will require these fields
        key="gate-1",             # useful if you have multiple gates
        timeout=24*3600           # optional: auto-timeout after 24h
    )
    if not approval or not approval.approved:
        raise RuntimeError("Not approved")
    # continue workflow...
Supplying the input (to resume) - In the UI: open the flow run (it will show “Awaiting input”), fill the form, click Resume. - Programmatically:
Copy code
from prefect import resume_flow_run

resume_flow_run(
    flow_run_id="00000000-0000-0000-0000-000000000000",
    run_input={"approved": True, "note": "LGTM"}
)
Scaling to hundreds of waiting flows - This is a supported use case. Paused/Suspended runs are persisted by the orchestrator; workers are not tied up while they wait. - When resumed, the scheduler queues the run for workers again. - You can add a timeout (as above) and/or use Automations to act on long-waiting runs (e.g., auto-cancel, notify, or auto-resume on an external event). Related APIs and docs - REST API: Resume a flow run — resume-flow-run - Python helpers for pausing/suspending and inputs (module index): prefect.input and prefect.flow_runs If you share your exact approval UX (UI-only vs programmatic, expected fields, timeouts), I can tailor the code and an automation to match.