Leon Kozlowski
03/12/2025, 3:42 PM@flow(
name="sandbox",
retries=2,
persist_result=True,
result_serializer="compressed/pickle",
result_storage="s3-bucket/dev-task-result-cache",
cache_result_in_memory=False,
**opsgenie_hooks
)
Bianca Hoch
03/13/2025, 6:47 PMLeon Kozlowski
03/13/2025, 6:51 PMLeon Kozlowski
03/13/2025, 6:51 PMBianca Hoch
03/13/2025, 6:51 PMLeon Kozlowski
03/13/2025, 6:51 PMLeon Kozlowski
03/13/2025, 6:53 PM@flow(name="data-api-ingestion", **opsgenie_hooks)
def data_api_ingestion(
tables: List[Dict] = TABLES,
start_date: Optional[str] = None, # type: ignore
years: int = 0,
months: int = 0,
days: int = -1,
hours: int = 0,
minutes: int = 0,
initial_sync: bool = False,
initial_sync_start_date: Optional[str] = None,
initial_sync_end_date: Optional[str] = None,
intial_sync_chunk_type: Optional[str] = None,
intial_sync_chunk_size: Optional[int] = None,
):
logger = get_run_logger()
context = get_run_context()
expected_start_time = context.flow_run.expected_start_time # type: ignore
if not start_date:
start_date = str(expected_start_time.date()) # type: ignore
start, end = get_start_end_bounds(
datetime=start_date,
years=years,
months=months,
days=days,
hours=hours,
minutes=minutes,
format="%Y-%m-%d %H:%M:%S",
)
for table in tables:
if initial_sync:
if initial_sync_end_date:
end_datetime = initial_sync_end_date
else:
end_datetime = end
daily_batches = generate_date_batches(
start_datetime=initial_sync_start_date,
end_datetime=end_datetime,
interval_type=intial_sync_chunk_type,
interval_size=intial_sync_chunk_size,
)
<http://logger.info|logger.info>(
f"Running {len(daily_batches)} initial sync batches for table: {table['name']}"
)
for batch_start, batch_end in daily_batches:
table["start"] = batch_start.strftime("%Y-%m-%d %H:%M:%S")
table["end"] = batch_end.strftime("%Y-%m-%d %H:%M:%S")
sync = sync_object(table=table)
load(
sync=quote(sync),
operation="INSERT",
slice_count=48,
expected_start_time=expected_start_time.to_datetime_string(),
)
Bianca Hoch
03/13/2025, 8:57 PMLets say the first 5 iterations of the task were successful - would those be tracked properly and not retried?Just to make sure I'm following - if the first 5 iterations succeed, you're wondering if their results would be cached and not retried in the event the flow fails?
Leon Kozlowski
03/13/2025, 9:23 PMBianca Hoch
03/14/2025, 4:45 PMfrom time import sleep
from prefect import flow, task
from prefect.cache_policies import INPUTS
# The input cache policy will cache the result of the task based on the input parameters
@task(retries=3, retry_delay_seconds=[2], cache_policy=INPUTS)
def hello_task(name_input: str):
if name_input == "Diana":
raise ValueError("Sorry, Diana is not allowed in this flow!")
print(f"Hey! What's up? {name_input}")
sleep(2)
return name_input
@flow(log_prints=True)
def hello_flow(names: list[str]):
results = []
for name in names:
result = hello_task(name)
results.append(result)
return results
if __name__ == "__main__":
names_list = ["Leon", "Bianca", "Marvin", "Prefect", "Diana"]
# First run will run all tasks, and raise an error for Diana
# Second run will use the cache for all names that were already run
hello_flow(names_list)
sleep(10)
In this example I'm using "INPUTS" as the cache policy for the task. This will enable you to use the task's input to compute the cache key. It's a bit different from the default cache policy, which uses the flow run ID in the cache key as well (swapping "INPUTS" out for "DEFAULT" would result in the tasks re-running and not using the cached values when the flow is re-run).Alexander Azzam
03/14/2025, 7:51 PMBianca Hoch
03/14/2025, 7:54 PMChris White
ValueError
within the flow and setting retries=3
on the flow instead of the task.Leon Kozlowski
03/14/2025, 8:04 PMprefect.yaml
- and lets say our flow replicates 10 objects from some API - if my flow were to fail replicating object 6 with OOM - a retry would spin up a new pod and replication objects 6-10Chris White
Leon Kozlowski
03/14/2025, 8:07 PMLeon Kozlowski
03/14/2025, 8:08 PMChris White
Leon Kozlowski
03/14/2025, 8:10 PMThe INPUTS cache policy would then extend that behavior across all reruns of the flowWhen you say all reruns - does that mean globally, or reruns of an atomic flow run?
Chris White
Leon Kozlowski
03/18/2025, 2:29 PM