Derek
03/01/2024, 3:28 PMfrom prefect import flow, task
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
from typing import List
from random import random
from prefect.tasks import task_input_hash
import hashlib
def hash_string(text):
stringify = f"{text}"
hash_object = hashlib.sha256()
hash_object.update(stringify.encode('utf-8'))
hashed_text = hash_object.hexdigest()
return hashed_text
base = "./tests/results_cache"
@task()
def generate_tasks():
return [f"Task - {x}" for x in range(15)]
@task(retries=2, cache_key_fn=task_input_hash)
def worker(job_id, task_payload: str):
if random() < .33:
raise Exception(f'Flakey Function Strikes Again - {job_id}')
result = f"Hashed payload {hash_string(task_payload)} for {job_id}"
return result
@flow()
def modal_batch(job_id, tasks: List[str]):
futures = []
for task in tasks:
future = worker.submit(job_id, task)
futures.append(future)
return [x.result() for x in futures]
@flow(result_storage=LocalFileSystem(basepath=base), result_serializer=JSONSerializer())
def main_entry(job_id):
tasks = generate_tasks()
resource_list = modal_batch(job_id, tasks)
print('CONTINUE HERE')
print(resource_list)
if __name__ == "__main__":
job_id = 'catcher-in-the-rye'
main_entry(job_id)
Brad
03/13/2024, 2:50 AM