Is it possible to set max_retries on <Tasks> from ...
# prefect-community
m
Is it possible to set max_retries on Tasks from a Parameter, i.e. runtime?
1
a
there is not - I had an example using KV Store for that in the past, but this was an extremely hacky solution to an extremely loosely defined problem 😄 Can you explain why would you need that?
you could e.g. retry only on specific exception types using retry_on
m
This task checks for data to be ready in an external source using
max_retries
and
retry_delay
in combination with
raise signals.FAIL/SUCCESS
. The flow is spawn from a main wrapper flow that can pass `Parameter()`'s on. The optimal values for
max_retries
/`retry_delay` are not known up-front and will likely be adjusted several times. I don't like hardcoding these, and having to release a new flow version to test other values.
a
thanks for explaining your use case! this helps to check whether data arrives, you would need to raise a RETRY signal rather than using retry from a parameter task - check the first section of this http://ype%20"help",%20"copyright",%20"credits"%20or%20"license"%20for%20more%20information.%20>>%20class%20A:%20...%20%20%20%20%20def%20my_method(self,%20x):%20...%20%20%20%20%20%20%20%20%20%20%20%20%20self.__setattr__('foo',%20x)%20...%20%20>>>%20class%20B:%20...%20%20%20%20%20pass%20...%20%20>>>%20my_b%20=%20B()%20>>>%20A.my_method(my_b,%20'bar')%20>>>%20my_b.foo%20'bar'%20I%20guess%20it%20makes%20sense%20since%20self%20is%20just%20an%20arg%20being%20passed|Discourse topic> with a full example
m
I tried to
raise signals.RETRY
but that doe not respect
max_retries
, so I use
FAIL
at the moment.
@Anna Geller discourse link?
Copy code
import pendulum
from prefect.engine.signals import RETRY
import awswrangler as wr

def check_if_file_arrived_in_s3():
    return wr.s3.does_object_exist("<s3://bucket/example_file.csv>")

@task
def s3_sensor(**kwargs):
    bool_s3_object_arrived = check_if_file_arrived_in_s3()
    if bool_s3_object_arrived is False:
        raise RETRY(
            "File not available yet, retrying in 20 seconds.",
            start_time=pendulum.now().add(seconds=20),
        )
m
Okay, this is similar to my case. I did not know of
start_time
option. This is a very unfortune thing about
*args, **kwargs
documentation style 😐 https://docs.prefect.io/api/latest/engine/signals.html#retry Even now that you tell me, I cant see what other options i can pass to
RETRY()
. I can only find the "state constructor" by reading source code...
My first guess is is this state constructor, but that does not take any
start_time
or
args
? https://docs.prefect.io/api/latest/engine/state.html#state-2
a
thanks for the feedback on docs. Are you good with your original issue?
m
Yep. I can solve this via
start_time
and
run_count
🙂 https://docs.prefect.io/api/latest/engine/state.html#retrying