Hi community, i am new to prefect and trying to wr...
# ask-community
a
Hi community, i am new to prefect and trying to wrap my head around the framework. I have a simple ETL flow that i need to implement reading a large collection from mongodb, transforming it, and saving back the result to a mysql table. The collection is too big to fit in memory so i need to be able to do some sort of batching but i am struggling to understand how. i was thinking of fixing a LIMIT constant, calculating the count of the collection and producing map tasks over a
range(0,count,LIMIT)
pseudo code:
Copy code
@task 
def count_collection_mongo(collectionName):
    # calculate number of rows in the collection and return it
    return count

@task
def load_data(collectionName, skip, limit):
    # data = db.collectionName.find().skip(skip).limit(limit)
    return data

@task
def transform(data):
    print(data.get('name'))
    return data

@task
def calculate_iterations_data(count_result):
    res = range(0, count_result, LIMIT)
    return res


with Flow("ETL flow for mycollection") as flow:
    result_count = count_collection_mongo("mycollection")
    iterations = calculate_iterations_data(result_count)
    data = load_data.map(iterations)
    transform.map(flatten(data))

flow.run()
k
Hi @Avi Haiat! Welcome to Prefect. Just a friendly reminder to post large code blocks in threads to not drown out other messages. The way to limit parallelism is task concurrency. I think that flatten at the end will cause the data to collect and cause memory issues. The approach here would be to have 2 stages of mapping and not flatten it I think. 2 stages of mapping will result in depth first execution on those rows.
1st stage would be to pull and 2nd stage would be to transform and upload.
And then to some garbage collection also to not hold on to those chunks. Docs for mapping: https://docs.prefect.io/core/concepts/mapping.html
a
thks for the answer. It s helpful. As it seems like a 101 real world scenario, is there any code example i could get inspiration from ?
k
I don’t have a good example unfortunately, but I feel as long as you get it working for chunk of data, it shouldn’t be hard to extend.