I have a case where mapped runs seem to read the cache from other mapped runs. I'm using an approach...
t
I have a case where mapped runs seem to read the cache from other mapped runs. I'm using an approach liked the Complex Mapped Pipelines and have it set up to write a data pull process to a local result, cached for 10 hours, using
_result_=LocalResult(_location_='{plan}_plan_data.prefect')
. In my case my mappings are "plans". The problem though is that in one case a plan/map pulls data and caches it, and then a different plan/map subsequently reads that same data from cache and uses it!! Any idea what might cause this kind of behavior? It causes a bunch of my plan/maps to just not work.
k
Do you have a small example you can share? I think you need to template the result for all of the tasks in the complex pipeline?
t
Copy code
def update_mrp_pegging_for_plan(plan):
    logger.debug(f'Mapped task starting for {plan}')
    plan_data_is_current = check_pull_date_of_(plan=plan)
    with case(plan_data_is_current,False): # If date is old, run the flow     
        <http://logger.info|logger.info>(f'{plan} data is out of date, initiating update.')
        df1=pull_mrp_plan_data_for(plan,prod=ORACLE_PROD) 
        df=set_data_types(df1,plan)
        del df1
        d=delete_plan_from_mrp_pegging_table(upstream_tasks=[df],mrp_pegging_table=DATA_DESTINATION_TABLE_NAME,plan_to_delete=plan)   
        u=upload_to_table(upstream_tasks=[d],df=df, destination_table=DATA_DESTINATION_TABLE_NAME,plan=plan, append=True)  
        <http://logger.info|logger.info>(f'Update complete for plan: {plan}')

    with case(plan_data_is_current,True): # If data is current, just log it
        <http://logger.info|logger.info>(f'{plan} data is current, skipping update.')
    return plan_data_is_current
	
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
    <http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
    # Get the MRP Plans to update
    plans_to_check = pull_oracle_data_as_list_via("QUERY TO GET LIST OF PLANS")
    logger.debug(f'Plans to check {plans_to_check}')
    apply_map(update_mrp_pegging_for_plan,plans_to_check)
The issue is the
pull_mrp_plan_data_for
caches data, and then a different map uses that data in the next function (
set_data_types
), resulting in the same data getting uploaded twice, and other data to be missing.
That enough of an example?
k
Yeah what does the result and cache for
pull_mrp_plan_data_for
look like? The one above?
_result_=LocalResult(_location_='{plan}_plan_data.prefect')
t
Correct
k
Ok will try this myself
t
Copy code
@task(name='Oracle Pull',task_run_name='Pull Oracle Data for {plan}',max_retries=3, retry_delay = timedelta(minutes=randint(5,45)), 
cache_for=timedelta(hours=10),result=LocalResult(location='{plan}_plan_data.prefect'),timeout=(4*60*60),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_mrp_plan_data_for(plan, prod=True,cache_flag = False) -> DataFrame:
I'm normally working with 14-16 plans each time (pulled from the
pull_oracle_data_as_list_via
query)
And the timing of the
pull_mrp_plan_data_for
can very greatly; ranging from 20 minutes to 2 hours. And that's the problem; it seems to pull the first round for my thread limit, and then reuse those caches for the remainder.
k
So caching takes in a
cache_key
. The
cache_key
allows caching to be respected across flows. I am wondering if the default
cache_key
is being applied to all mapped runs.
I am checking with the team if the mapped tasks end up sharing the default
cache_key
👍 1
t
I'm using this in production so for the meantime I'll just remove caching on that
Do I need to remove the result flag too to avoid this? You know?
k
No you don’t. Just the cache I think. I think these mapped tasks share the same
cache_key
but I am digging more into it
👍 1
So I have confirmed they share the same
cache_key
and the
cache_key
can’t be templated as it only handles a string. Will open an issue to see what can be done about it
upvote 2
t
Thanks @Kevin Kho. I have to say, I'm a bit surprised by this. Is it possible that I'm truly the first person to cache a task in a mapped flow of this sort? Similarly, the other issue I had that resulted in a bug was a case where flows finished but didn't return. Both of these seem to me like they should be pretty basic functionality; am I doing something deeply unusual that I keep hitting these bugs and no one has before?
I'm so new to Prefect that two bugs that disrupt reliability on such a basic level (in addition to caches not cleaning up after themselves) make me worry, and want to check if I'm causing this somehow.
k
Chatted with the team, and I think you are the first person to run into this because we did the “cache toggle”. More of our users use targets with templated names for mapping (which some kind of rounded timestamp in the filename to set the duration). I think we patched the flow finishing but not returning? For what its worth though, I chatted with the team and we are aware that the caching experience for this specific situation is not great, and Prefect 2.0 has support for this as cache_keys can be callables set at runtime. Not that I suggest using it quite yet as it’s still in technical preview.
t
Okay, but in that case your other users must also have issues with the cache folder growing indefinitely, right? Has no one else ever hit the space limits from that? Or is it typical to need to set up cron jobs to clean up after it?
Also, I'm a bit confused, am I not using templated names now? I don't have the timestamp in it, but my filenames are mapped via the template:
@task(_name_='Oracle Pull',_task_run_name_='Pull Oracle Data for {plan}',_max_retries_=3, _retry_delay_ = timedelta(_minutes_=randint(5,45)), _cache_for_=timedelta(_hours_=10),_result_=LocalResult(_location_='{plan}_plan_data.prefect'),_timeout_=(4*60*60),_cache_validator_=prefect.engine.cache_validators.all_parameters)
Is there something I'm doing differently here other than not having a timestamp? Or is that the difference?
k
Typical to setup jobs to clean up the data. So there are two forms of caching. One is using the
cache_for
+
cache_validator
. The other is using targets. So the mapping + caching doesn’t work because of the shared
cache_key
, but
target
will work over mapping because it’s filename based rather than
cache_key
based. So target is more common than
cache_for
+
cache_validator
. The problem in your use case though is moving to
target
will make the toggle you have not work since there is no
cache_validator
for
target
t
Ohh, so this is a difference between
result
and
target
. And I have to stick with what I've got due to the validator. Understood. In that case, is the issue visible on Github somewhere so I can track the progress to a resolution and know when it's done? Also, is there any info on the job to clean up data? Or do I just create a job to run a bash command to delete old files?