https://prefect.io logo
Title
l

latif

11/22/2022, 8:07 AM
Hello everyone. This more of a general question than a specific issue. I have a set of requirements and I'm unsure if Prefect is suitable for them (perhaps ETL tools in general aren't). I need to aggregate data from different sources, process it in chunks, and then stream the chunks to a browser for each client. I think conceptually there are 2 ways to map this onto Prefect. (1) Have tasks for aggregation, and then a separate long-lived task that handles chunking, processing and streaming. In this case one flow takes care of the entire job. (2) Have tasks for aggregation, then a subflow for each chunk, where we have tasks for processing the chunk and a separate one for streaming it back. Both approaches do something that maybe Prefect isn't designed for, I don't know enough about Prefect's internals to understand the performance/space cost of a single flow, or the potential problems of a really long-lived task. (1) would have a really long-lived task, maybe even days. (2) Would have huge amount of subflows (there could be around 50,000 chunks) for a single job. I prefer (2) just because (1) feels like it doesn't have the task granularity to really take advantage of Prefect. I'm open to alternative approaches. Ordinarily I'd reach for something like Kafka for this, but I like getting the benefits of Prefect (logging, observability, retries..etc) for free.
k

Khuyen Tran

11/22/2022, 8:52 PM
I think using tasks (option 1) will allow you to run your code faster:
from prefect import flow, task

@task
def extract():
    data = ...
    return data
    

@task 
def create_chunks(data, n):
    # turn the data into a list of chunks
    # Each chunks contain several rows of data
    chunk_list = ...
    return chunk_list

@task
def transform(data):
    # do stuff on the data and return transformed data
    transform_data = ...
    return transformed_data

@task
def aggregate_chunks(data):
    # returns aggregated list of data
    aggregated = ...
    return aggregated

@task
def load(data):
    pass

@flow
def etl():
    extracted_data = extract.submit()
    chunks = create_chunks.submit(extracted_data)
    transformed_data = transform.map(chunks)
    aggregated = aggregate_chunks(transformed_data)
    load.submit(aggregated)
Unless I misunderstood your option 1