Tom Shaffner
01/11/2022, 3:48 PM_result_=LocalResult(_location_='{plan}_plan_data.prefect')
. In my case my mappings are "plans". The problem though is that in one case a plan/map pulls data and caches it, and then a different plan/map subsequently reads that same data from cache and uses it!!
Any idea what might cause this kind of behavior? It causes a bunch of my plan/maps to just not work.Kevin Kho
Tom Shaffner
01/11/2022, 3:53 PMdef update_mrp_pegging_for_plan(plan):
logger.debug(f'Mapped task starting for {plan}')
plan_data_is_current = check_pull_date_of_(plan=plan)
with case(plan_data_is_current,False): # If date is old, run the flow
<http://logger.info|logger.info>(f'{plan} data is out of date, initiating update.')
df1=pull_mrp_plan_data_for(plan,prod=ORACLE_PROD)
df=set_data_types(df1,plan)
del df1
d=delete_plan_from_mrp_pegging_table(upstream_tasks=[df],mrp_pegging_table=DATA_DESTINATION_TABLE_NAME,plan_to_delete=plan)
u=upload_to_table(upstream_tasks=[d],df=df, destination_table=DATA_DESTINATION_TABLE_NAME,plan=plan, append=True)
<http://logger.info|logger.info>(f'Update complete for plan: {plan}')
with case(plan_data_is_current,True): # If data is current, just log it
<http://logger.info|logger.info>(f'{plan} data is current, skipping update.')
return plan_data_is_current
with Flow(FLOW_NAME,result=LocalResult(),schedule=schedule) as flow:
<http://logger.info|logger.info>(f"{FLOW_NAME} Flow initiated, running in {file_path}")
# Get the MRP Plans to update
plans_to_check = pull_oracle_data_as_list_via("QUERY TO GET LIST OF PLANS")
logger.debug(f'Plans to check {plans_to_check}')
apply_map(update_mrp_pegging_for_plan,plans_to_check)
Tom Shaffner
01/11/2022, 3:54 PMpull_mrp_plan_data_for
caches data, and then a different map uses that data in the next function (set_data_types
), resulting in the same data getting uploaded twice, and other data to be missing.Tom Shaffner
01/11/2022, 3:54 PMKevin Kho
pull_mrp_plan_data_for
look like? The one above? _result_=LocalResult(_location_='{plan}_plan_data.prefect')
Tom Shaffner
01/11/2022, 4:02 PMKevin Kho
Tom Shaffner
01/11/2022, 4:02 PM@task(name='Oracle Pull',task_run_name='Pull Oracle Data for {plan}',max_retries=3, retry_delay = timedelta(minutes=randint(5,45)),
cache_for=timedelta(hours=10),result=LocalResult(location='{plan}_plan_data.prefect'),timeout=(4*60*60),cache_validator=prefect.engine.cache_validators.all_parameters)
def pull_mrp_plan_data_for(plan, prod=True,cache_flag = False) -> DataFrame:
Tom Shaffner
01/11/2022, 4:03 PMpull_oracle_data_as_list_via
query)Tom Shaffner
01/11/2022, 4:06 PMpull_mrp_plan_data_for
can very greatly; ranging from 20 minutes to 2 hours. And that's the problem; it seems to pull the first round for my thread limit, and then reuse those caches for the remainder.Kevin Kho
cache_key
. The cache_key
allows caching to be respected across flows. I am wondering if the default cache_key
is being applied to all mapped runs.Kevin Kho
cache_key
Tom Shaffner
01/11/2022, 5:21 PMTom Shaffner
01/11/2022, 5:21 PMKevin Kho
cache_key
but I am digging more into itKevin Kho
cache_key
and the cache_key
can’t be templated as it only handles a string. Will open an issue to see what can be done about itTom Shaffner
01/11/2022, 9:45 PMTom Shaffner
01/11/2022, 9:47 PMKevin Kho
Tom Shaffner
01/12/2022, 4:14 PMTom Shaffner
01/12/2022, 4:19 PM@task(_name_='Oracle Pull',_task_run_name_='Pull Oracle Data for {plan}',_max_retries_=3, _retry_delay_ = timedelta(_minutes_=randint(5,45)), _cache_for_=timedelta(_hours_=10),_result_=LocalResult(_location_='{plan}_plan_data.prefect'),_timeout_=(4*60*60),_cache_validator_=prefect.engine.cache_validators.all_parameters)
Is there something I'm doing differently here other than not having a timestamp? Or is that the difference?Kevin Kho
cache_for
+ cache_validator
. The other is using targets. So the mapping + caching doesn’t work because of the shared cache_key
, but target
will work over mapping because it’s filename based rather than cache_key
based. So target is more common than cache_for
+ cache_validator
. The problem in your use case though is moving to target
will make the toggle you have not work since there is no cache_validator
for target
Tom Shaffner
01/12/2022, 5:17 PMresult
and target
. And I have to stick with what I've got due to the validator. Understood.
In that case, is the issue visible on Github somewhere so I can track the progress to a resolution and know when it's done?
Also, is there any info on the job to clean up data? Or do I just create a job to run a bash command to delete old files?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by