jcozar
02/06/2023, 12:37 PM@task(
persist_result=True,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def hard_work(th):
r = random.random()
if r < th:
raise ValueError(f"Number greater than {th}: {r}")
return True
The flow is as follows:
@flow(
result_storage=s3,
persist_result=True,
)
def errors():
hard_work.map([0.5, 0.5, 0.5, 0.5])
If the flow run fails because of some of the mapped tasks, and I retry the flow run, then all mapped tasks (success and fail) finish in state Cached. Is there a way to retry the failed mapped tasks?
Thank you!def cache_rerun_not_completed(context, params):
if context.task_run.state_type != prefect.states.StateType.COMPLETED:
return None
# if completed, use prefect function task_input_hash
return task_input_hash(context, params)
Basically if the task run is not completed, return None to always run it again. Just to keep in mind, if I use the function suggested in the documentation it does not work because it does not use the state:
def cache_within_flow_run(context, parameters):
return f"{context.task_run.flow_run_id}-{task_input_hash(context, parameters)}"
If you think that this is useful I can contribute and make a pull requests. What do you think?