latif
11/22/2022, 8:07 AMKhuyen Tran
11/22/2022, 8:52 PMfrom prefect import flow, task
@task
def extract():
data = ...
return data
@task
def create_chunks(data, n):
# turn the data into a list of chunks
# Each chunks contain several rows of data
chunk_list = ...
return chunk_list
@task
def transform(data):
# do stuff on the data and return transformed data
transform_data = ...
return transformed_data
@task
def aggregate_chunks(data):
# returns aggregated list of data
aggregated = ...
return aggregated
@task
def load(data):
pass
@flow
def etl():
extracted_data = extract.submit()
chunks = create_chunks.submit(extracted_data)
transformed_data = transform.map(chunks)
aggregated = aggregate_chunks(transformed_data)
load.submit(aggregated)