Hi Everyone, I have a flow where certain tasks can...
# ask-community
d
Hi Everyone, I have a flow where certain tasks can fail and thus the downstream dependencies fail and the flow run finishes. Is it possible to rerun the flow run with only tasks that have failed?
j
hey Derek, i think caching would be a solution to your problem https://docs.prefect.io/2.14.3/concepts/tasks/?h=caching#caching Completed tasks would go directly into status "cached" and only previously failed, crashed or "not ready" tasks would run again.
d
How do you rerun then?
By using the same cache key?
j
yes, it depends on the cache key function that you use but if you use the task_input_hash function, this should provide the same cache key for a rerun, given that the code and arguments remain the same between runs.
1
d
Sounds good. I'll try and implement / test this week and get back here with some updates. Thanks!
@Justin Trautmann I am trying to reuse a task and pass in different parameters. I think the changing parameters from the downstream task are making this not work as expected. For instance, if I try and re-run the flow and the same parameters are passed the 2nd time around, it does not use a cached task that ran successfully in the previous run
Let me know if you have some time to discuss. I'm wondering if it's possible to keep the single task and not have to create a different task for every set of parameters
j
i'm not sure if i get your problem. maybe a mini example helps to understand the problem. is this what you mean with single task, multiple parameters?
Copy code
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash)
def this_task_succeeds(x=None):
    return "foo"

@task(cache_key_fn=task_input_hash)
def this_task_fails(x=None):
    raise Exception

@flow()
def main():
    this_task_fails.submit(this_task_succeeds.submit("foo"))
    this_task_fails.submit(this_task_succeeds.submit("bar"))

if __name__ == "__main__":
    main()
d
Not exactly. Let me provide the example
Copy code
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash)
def reused_task(x=None, param1, param2):
    pass

@flow()
def main():
    reused_task.submit("foo")
    reused_task.submit("bar")

if __name__ == "__main__":
    main()
I am reusing the task with different input paramaters within the flow.
When I rerun the flow, both tasks are re-run even though they've been completed successfully with the same parameters.
I'm thinking it's because the parameter changed on the 2nd call so the cache is no longer valid, but I'm not sure. I'm trying to avoid creating a separate task for each of the different parameters
Essentially I want to run this flow twice, but the second time through I want both tasks to remain cached and not rerun. Can I add a name or something with the .with_options(name="task1") or will that not differentiate them at all
j
where do param1 abd param2 come from?
d
they are defined just like in my example, strings in the flow
j
ok. in your example they aren't defined anywhere. i think the important detail is if these params change between flow runs.
d
They are strings that are not changed between flow runs. Do they need to be defined somewhere else?
j
if i define them as strings, (e.g. "x", "y", "p", "q"), this works exactly like expected 🤔
Copy code
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash)
def reused_task(x, param1, param2):
    pass

@flow()
def main():
    reused_task.submit("foo", "x", "y")
    reused_task.submit("bar", "p", "q")

if __name__ == "__main__":
    main()
d
Maybe I'm missing something in my example
Copy code
expected_sources = read_company_config(s3_bucket, s3_path_prefix)

    ingestion_tasks = {source: run_module_on_emr_serverless_task.submit(source, spe_config, wait_for=[expected_sources]) for source in expected_sources}
This is how it is called
read_company_config source seems to be cached as expected
but the run_module_on_emr_serverless_task tasks are not