Hello! I am stuck in a basic problem: I have a ta...
# prefect-cloud
j
Hello! I am stuck in a basic problem: I have a task in a flow that has a "retries" parameter How to re-run the task in the flow from scratch when the flow is retrying? The task keeps giving the cached result. I tried to play with "cache_key_fn", "refresh_cache" and "cache_expiration" parameters but it did not worked... I made a very basic example (attached file), where
print('- Report Run ID: ' + report_run_id + ' -')
always give the same results when the flow retries...... 😢 (-> which means that it does not generate new numbers after flow retrying) Can somebody help me? Thanks a lot!!
🆘 1
n
hi @Johan sh - what about something like this? i.e. use the
run_count
as part of your cache key
Copy code
from prefect import flow, task
from prefect.runtime import flow_run
from prefect.tasks import task_input_hash


def my_cache_fn(*args, **kwargs):
    return task_input_hash(*args, **kwargs) or "" + str(flow_run.run_count)


@task(cache_key_fn=my_cache_fn)
def some_task() -> str:
    return "some task"


@flow(retries=1, log_prints=True)
def my_flow():
    print(f"this is try number {flow_run.run_count} of this flow")
    some_task()
    raise ValueError("this is a failure")


if __name__ == "__main__":
    my_flow()
i would probably add some prefix (like
flow_run.id
) to
str(flow_run.run_count)
for the case where the result from
task_input_hash
is falsy for some reason
j
Hi @Nate, thanks a lot for your time! It still not working... Same IDs...