Martin T
06/10/2022, 12:00 PMAnna Geller
06/10/2022, 12:23 PMMartin T
06/10/2022, 12:42 PMmax_retries
and retry_delay
in combination with raise signals.FAIL/SUCCESS
. The flow is spawn from a main wrapper flow that can pass `Parameter()`'s on.
The optimal values for max_retries
/`retry_delay` are not known up-front and will likely be adjusted several times.
I don't like hardcoding these, and having to release a new flow version to test other values.Anna Geller
06/10/2022, 1:18 PMMartin T
06/10/2022, 1:34 PMraise signals.RETRY
but that doe not respect max_retries
, so I use FAIL
at the moment.Anna Geller
06/10/2022, 1:42 PMimport pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr
def check_if_file_arrived_in_s3():
return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")
@task
def s3_sensor(**kwargs):
bool_s3_object_arrived = check_if_file_arrived_in_s3()
if bool_s3_object_arrived is False:
raise RETRY(
"File not available yet, retrying in 20 seconds.",
start_time=pendulum.now().add(seconds=20),
)
Martin T
06/10/2022, 1:45 PMstart_time
option.
This is a very unfortune thing about *args, **kwargs
documentation style 😐 https://docs.prefect.io/api/latest/engine/signals.html#retry
Even now that you tell me, I cant see what other options i can pass to RETRY()
. I can only find the "state constructor" by reading source code...start_time
or args
?
https://docs.prefect.io/api/latest/engine/state.html#state-2Anna Geller
06/10/2022, 2:01 PMMartin T
06/10/2022, 2:04 PMstart_time
and run_count
🙂 https://docs.prefect.io/api/latest/engine/state.html#retrying