Deceivious
03/08/2023, 9:33 AMRunning
. Ive written a flow [code below] that
1. runs every minute on a single concurrency queue
2. has a task that retries 5 times with 1 min of retry delay.
Any new flow runs is immediately set to Late
state. This behavior kinda makes sense. But I would like the task retries to be non blocking on flow level.
I am guessing the easiest way of doing this would be to move the retries from task level to flow level and cache and persist the task result? But this would remove individual retry attempts counter on individual task level.
Looking for community's view on this approach. 👀from prefect import flow, task
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule
import datetime
@flow(name="fail_test")
def main():
run()
@task(retries=5, retry_delay_seconds=60)
def run():
# some code to make this fail 1 out of 2 times
raise ValueError("Always fail.")
if __name__ == "__main__":
deployment = Deployment.build_from_flow(
flow=main,
name="fail_deployment",
work_queue_name="sync_queue", # ensure this queue has 1 concurrency
schedule=IntervalSchedule(interval=datetime.timedelta(minutes=1)),
)
deployment.apply()