https://prefect.io logo
Title
j

jcozar

02/06/2023, 12:37 PM
Hi there! Just a question about cache with maps. Description in the thread:
I have a task that is mapped, and it can fail. Let's say the following:
@task(
    persist_result=True,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1)
)
def hard_work(th):
    r = random.random()
    if r < th:
        raise ValueError(f"Number greater than {th}: {r}")
    return True
The flow is as follows:
@flow(
    result_storage=s3,
    persist_result=True,
)
def errors():
    hard_work.map([0.5, 0.5, 0.5, 0.5])
If the flow run fails because of some of the mapped tasks, and I retry the flow run, then all mapped tasks (success and fail) finish in state Cached. Is there a way to retry the failed mapped tasks? Thank you!
I've been digging into it and I finally found a solution to my problem. I implemented this cache key function:
def cache_rerun_not_completed(context, params):
    if context.task_run.state_type != prefect.states.StateType.COMPLETED:
        return None
    # if completed, use prefect function task_input_hash
    return task_input_hash(context, params)
Basically if the task run is not completed, return None to always run it again. Just to keep in mind, if I use the function suggested in the documentation it does not work because it does not use the state:
def cache_within_flow_run(context, parameters):
    return f"{context.task_run.flow_run_id}-{task_input_hash(context, parameters)}"
If you think that this is useful I can contribute and make a pull requests. What do you think?
My mistake, forget about this thread...I was confused because of the use of random