Hi - I'm using cloud and docker storage. I'm wrapp...
# prefect-community
m
Hi - I'm using cloud and docker storage. I'm wrapping my head around "automatic" input caching specifically as it relates to task retries. According to the docs "Input caching is an automatic caching. Prefect will automatically apply it whenever necessary." This caching happens within our container and not on your machines, correct? Are there limits to the size of this cache (like if a task takes as an input some large dataframe generated by an upstream task)
c
Hi Mark, ultimately how data is cached / stored depends on the result handler configuration you are running with. For example:
Copy code
@task(result_handler=LocalResultHandler(dir="/my/data"))
def task_a():
    return 42

@task(result_handler=S3ResultHandler(bucket="my-data", max_retries=2, retry_delay=timedelta(seconds=2))
def task_b(x):
    return x + 1
if task_b goes into a
Retrying
state, task A’s result handler will be called to store that data somewhere (in this case, on your local file system — if running in docker this will require a volume mount of some kind to make sure the data is persisted outside of the ephemeral container). The only thing sent to cloud will be the filepath of A’s data, not the data itself
Also note that you can configure a “global” result handler for your Flow by providing a result handler to your Flow’s initialization, which will then be used for all tasks unless it’s overriden
m
so - that's where I'm getting confused. The docs say that input caching is "automatic". And that's proven to me by this little flow I've been testing locally (not yet tested in prefect cloud). All tasks end up succeeding, meaning that their upstream inputs are being cached. However, I do not have a result handler defined.
Copy code
from datetime import timedelta
from random import randint

from prefect import task, Flow


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def extract():
    """Get a list of data"""
    return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def transform(data):
    """Multiply the input by 10"""
    value = randint(0, 10)
    if value >= 8:
        raise Exception
    else:
        return data * 10


@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load(data):
    """Print the data to indicate it was received"""
    print("Here's your data: {}".format(data))


with Flow('ETL') as flow:
    e = extract()
    t = transform.map(e)
    l = load.map(t)


if __name__ == '__main__':
    flow.run()
so to have input caching, do you or don't you need a result handler defined? If you do need it, why does this toy flow work?
c
Yea, when running in Core alone result handlers are not needed since all retries occur within the same process. In this situation the “cache” is stored directly on the
Retrying
state object (on the
cached_inputs
attribute). Result Handlers become necessary anytime a retry occurs in a new process (which typically only occurs when running in Cloud in special circumstances, for example a manual rerun of a task)
m
ah ok got it. Thanks @Chris White!
c
anytime! glad I could help