Mark McDonald
04/02/2020, 8:11 PMChris White
@task(result_handler=LocalResultHandler(dir="/my/data"))
def task_a():
return 42
@task(result_handler=S3ResultHandler(bucket="my-data", max_retries=2, retry_delay=timedelta(seconds=2))
def task_b(x):
return x + 1
if task_b goes into a Retrying
state, task A’s result handler will be called to store that data somewhere (in this case, on your local file system — if running in docker this will require a volume mount of some kind to make sure the data is persisted outside of the ephemeral container). The only thing sent to cloud will be the filepath of A’s data, not the data itselfChris White
Mark McDonald
04/02/2020, 8:18 PMfrom datetime import timedelta
from random import randint
from prefect import task, Flow
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def extract():
"""Get a list of data"""
return [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def transform(data):
"""Multiply the input by 10"""
value = randint(0, 10)
if value >= 8:
raise Exception
else:
return data * 10
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load(data):
"""Print the data to indicate it was received"""
print("Here's your data: {}".format(data))
with Flow('ETL') as flow:
e = extract()
t = transform.map(e)
l = load.map(t)
if __name__ == '__main__':
flow.run()
Mark McDonald
04/02/2020, 8:19 PMChris White
Retrying
state object (on the cached_inputs
attribute). Result Handlers become necessary anytime a retry occurs in a new process (which typically only occurs when running in Cloud in special circumstances, for example a manual rerun of a task)Mark McDonald
04/02/2020, 8:20 PMChris White