Hi All - a quick question, I'm curious if there is...
# ask-community
d
Hi All - a quick question, I'm curious if there is a way to only cache results if the task is successful? I'd like this code to cache the task if successful and cache the flows' result if all subtasks are successful. Obviously this code has a lightweight placeholder that represents expensive compute where caching the result is a big cost savings. At the moment the caching mechanism is saving the exception message.
Copy code
from prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from typing import List
from random import random
from prefect.tasks import task_input_hash
import hashlib

def hash_string(text):
    stringify = f"{text}"
    hash_object = hashlib.sha256()
    hash_object.update(stringify.encode('utf-8'))
    hashed_text = hash_object.hexdigest()
    return hashed_text

base = "./tests/results_cache"

@task()
def generate_tasks():
    return [f"Task - {x}" for x in range(15)]


@task(retries=2, cache_key_fn=task_input_hash)
def worker(job_id, task_payload: str):
    if random() < .33: 
        raise Exception(f'Flakey Function Strikes Again - {job_id}')
    result = f"Hashed payload {hash_string(task_payload)} for {job_id}"
    return result

@flow()
def modal_batch(job_id, tasks: List[str]):
    futures = []
    for task in tasks:
        future = worker.submit(job_id, task)
        futures.append(future)

    return [x.result() for x in futures]


@flow(result_storage=LocalFileSystem(basepath=base), result_serializer=JSONSerializer())
def main_entry(job_id):
    tasks = generate_tasks()

    resource_list = modal_batch(job_id, tasks)

    print('CONTINUE HERE')
    print(resource_list)


if __name__ == "__main__":
    job_id = 'catcher-in-the-rye'
    main_entry(job_id)
b
+1 also interested in this