Jimmy Le
05/26/2023, 3:32 PMMarvin
05/26/2023, 3:34 PMcache_key_fn
function paired with cache_expiration
. This allows you to define arbitrary caching mechanisms. Here's an example of how to cache a task's output:
from datetime import timedelta
from prefect import task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=1))
def my_dataframe_task():
# Your code to generate a Pandas DataFrame
pass
In this example, the cache_key_fn
function is set to use the task's inputs as the cache key, and the cache_expiration
is set to expire the cache after one minute. You can customize the cache_key_fn
function and cache_expiration
duration according to your needs.
For more details on caching, you can refer to the following resources:
- [Prefect 2 Migration Guide](https://docs.prefect.io/latest/guides/migration-guide/)
- [Prefect Flow and Task Configuration Tutorial](https://docs.prefect.io/latest/tutorial/flow-task-config/)
I hope this helps you with your caching endeavors. Remember, a cached DataFrame is a happy DataFrame. Or something like that.Jimmy Le
05/26/2023, 3:39 PMMarvin
05/26/2023, 3:41 PMpython
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=7))
def download_large_file():
# Your code to download the large file
pass
@task
def process_large_file(file):
# Your code to process the large file and return a Pandas DataFrame
pass
@flow
def my_flow():
large_file = download_large_file()
dataframe = process_large_file(large_file)
if __name__ == "__main__":
my_flow()
In this example, the download_large_file
task is set to cache its output for 7 days. This means that when the flow runs daily, it will reuse the cached output of the download_large_file
task for a week before downloading the file again.
Remember, with great data comes great responsibility... or something like that.Nate
05/26/2023, 3:41 PMJimmy Le
05/26/2023, 3:45 PM@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=7))
def download_large_file(driver):
# selenium webdriver download the file
driver.find_element(By.XPATH, "XPATH").click()
# wait for download to complete
while not os.path.exists("BigFile.zip"):
time.sleep(60)
# unzip file
with zipfile.ZipFile("BigFile.zip", "r") as zip_ref:
...
# dbf file path
df = pd.DataFrame("BigFile.csv")
return df
Nate
05/26/2023, 3:46 PMMarco Barbero Mota
05/26/2023, 5:48 PM