If I'm doing flow level caching - is there any way...
# ask-community
l
If I'm doing flow level caching - is there any way I can provide a dir in s3 to delineate on a per flow basis? example
Copy code
@flow(
    name="sandbox",
    retries=2,
    persist_result=True,
    result_serializer="compressed/pickle",
    result_storage="s3-bucket/dev-task-result-cache",
    cache_result_in_memory=False,
    **opsgenie_hooks
)
b
Hi Leon! I haven't been able to to find a way to change the directory on a per-flow basis (yet). I know you can specify a folder to use for storage in the S3 bucket block. I imagine you could create a block for each flow which references a different folder (probably not what you're looking for though, understandably).
l
I mean ultimately if prefect manages it regardless I have no reason for it to be be in different directories
I had 1 more question on retries + caching tho
b
Go for it 👂
l
If I have a loop within my flow definition that calls a task Lets say the first 5 iterations of the task were successful - would those be tracked properly and not retried? - I'll get a code sample for you
Copy code
@flow(name="data-api-ingestion", **opsgenie_hooks)
def data_api_ingestion(
    tables: List[Dict] = TABLES,
    start_date: Optional[str] = None,  # type: ignore
    years: int = 0,
    months: int = 0,
    days: int = -1,
    hours: int = 0,
    minutes: int = 0,
    initial_sync: bool = False,
    initial_sync_start_date: Optional[str] = None,
    initial_sync_end_date: Optional[str] = None,
    intial_sync_chunk_type: Optional[str] = None,
    intial_sync_chunk_size: Optional[int] = None,
):
    logger = get_run_logger()
    context = get_run_context()
    expected_start_time = context.flow_run.expected_start_time  # type: ignore

    if not start_date:
        start_date = str(expected_start_time.date())  # type: ignore

    start, end = get_start_end_bounds(
        datetime=start_date,
        years=years,
        months=months,
        days=days,
        hours=hours,
        minutes=minutes,
        format="%Y-%m-%d %H:%M:%S",
    )

    for table in tables:
        if initial_sync:
            if initial_sync_end_date:
                end_datetime = initial_sync_end_date
            else:
                end_datetime = end

            daily_batches = generate_date_batches(
                start_datetime=initial_sync_start_date,
                end_datetime=end_datetime,
                interval_type=intial_sync_chunk_type,
                interval_size=intial_sync_chunk_size,
            )
            <http://logger.info|logger.info>(
                f"Running {len(daily_batches)} initial sync batches for table: {table['name']}"
            )
            for batch_start, batch_end in daily_batches:
                table["start"] = batch_start.strftime("%Y-%m-%d %H:%M:%S")
                table["end"] = batch_end.strftime("%Y-%m-%d %H:%M:%S")

                sync = sync_object(table=table)

                load(
                    sync=quote(sync),
                    operation="INSERT",
                    slice_count=48,
                    expected_start_time=expected_start_time.to_datetime_string(),
                )
b
Lets say the first 5 iterations of the task were successful - would those be tracked properly and not retried?
Just to make sure I'm following - if the first 5 iterations succeed, you're wondering if their results would be cached and not retried in the event the flow fails?
l
Yes exactly
b
Circling back here- the answer is yes, you can set it up like that! Here's a little example for you to test with:
Copy code
from time import sleep
from prefect import flow, task
from prefect.cache_policies import INPUTS


# The input cache policy will cache the result of the task based on the input parameters
@task(retries=3, retry_delay_seconds=[2], cache_policy=INPUTS)
def hello_task(name_input: str):
    if name_input == "Diana":
        raise ValueError("Sorry, Diana is not allowed in this flow!")
    print(f"Hey! What's up? {name_input}")
    sleep(2)
    return name_input


@flow(log_prints=True)
def hello_flow(names: list[str]):
    results = []
    for name in names:
        result = hello_task(name)
        results.append(result)
    return results


if __name__ == "__main__":
    names_list = ["Leon", "Bianca", "Marvin", "Prefect", "Diana"]
    # First run will run all tasks, and raise an error for Diana
    # Second run will use the cache for all names that were already run
    hello_flow(names_list)
    sleep(10)
In this example I'm using "INPUTS" as the cache policy for the task. This will enable you to use the task's input to compute the cache key. It's a bit different from the default cache policy, which uses the flow run ID in the cache key as well (swapping "INPUTS" out for "DEFAULT" would result in the tasks re-running and not using the cached values when the flow is re-run).
yess 1
🙌 1
a
@Chris White does anything ring a bell here
👀 1
b
Ooo - yes please, share your thoughts 💡
c
Yea I basically agree with @Bianca Hoch although there's ambiguity in the word "retry"; in Prefect, a retry uses the same task / flow run ID so the DEFAULT cache policy would also prevent previously successful tasks within a retrying flow run from rerunning. The INPUTS cache policy would then extend that behavior across all reruns of the flow, that is, the tasks would load their cached value so long as the inputs were unchanged regardless of flow run ID. So a different example to experiment with and compare against would be to raise a
ValueError
within the flow and setting
retries=3
on the flow instead of the task.
🙌 2
upvote 1
l
I have kind of a niche feature request - but it would be extremely nifty for our infra at Flock - background we use k8s + karpenter for dynamic infra management If a failed run could retry in a new pod from point of failure that would be epic The reason: Lets say some of our api ingestion has a heavy day volume wise, the sizing is determined by mem limit + cpu limit in our
prefect.yaml
- and lets say our flow replicates 10 objects from some API - if my flow were to fail replicating object 6 with OOM - a retry would spin up a new pod and replication objects 6-10
c
Yea, essentially a retry at the infrastructure level?
l
Right but with those cached task results managed
Does that make sense - I can elaborate if not
c
I think this might be achievable with the INPUTS cache policy + an automation that reschedules failed flow runs. And actually you may not even need to change the cache policy, an automation that resets failed states to scheduled states might work (there would need to be some guard to avoid infinite retries though, not sure about that part)
l
This line is giving me pause tho
The INPUTS cache policy would then extend that behavior across all reruns of the flow
When you say all reruns - does that mean globally, or reruns of an atomic flow run?
c
globally, whether a task loads a cached value would depend entirely on its inputs when using the INPUTS cache policy. If you instead set up an automation that resets failed states to scheduled (which would preserve the flow run ID), then the default cache configuration would remain scoped to individual flow runs which sounds more of what you are looking for
l
interesting - thanks for the info all