Hi! I have a question regarding processing a large...
# prefect-community
t
Hi! I have a question regarding processing a large amount of objects. I’d have a task that keeps yielding object IDs, and those objects should be processed by a per-object task (in parallel). Since there are many objects to be processed (potentially 1MM), object processing should start while object IDs are still being fetched: I wouldn’t want to have an in-memory list of all the object IDs. See below for pseudo code. Is it possible to model this with prefect? If so, how? Thanks for your help!
Copy code
def fetch_object_ids():
  # Long running task with many object_ids returning.
  cursor = None
  while True:
    # Keep fetching and yielding object IDs until we're exhausted.
    obj_ids, cursor = external_request_to_fetch_object_ids(cursor)
    for obj_id in obj_ids:
      yield obj_id
    if cursor is None:
      break

def process_object(object_id):
  do_something_with_object(object_id)

with Flow("Process Objects" as flow):
  object_ids = fetch_object_ids()  # I don't want to wait here / keep this list in memory
  process_object.map(object_ids)
  send_finished_email_to_user()  # Should depend on all objects processed.
d
Hey @tom! This would be a pretty cool feature and I’ve passed your feedback along to the team
In the meantime, I’m not sure there’s a way to start processing downstream dependencies before upstream ones complete
t
Thanks! How would you suggest modeling this process meanwhile? Is there some other way it could be batched?
d
That takes a deep dive
t
Yep, I saw that.
d
If it’s possible to know how many records there are ahead of time you can generate a list of configuration objects and map over that
Which can take advantage of DFE
t
I might have the count, or at least an approximate count. What do you mean by configuration objects?
d
I actually have a flow that does this
one sec
Copy code
@task(
    name="Get New Record Counts",
    checkpoint=True,
    max_retries=5,
    retry_delay=timedelta(minutes=3),
    tags=["prefect_cloud_database"],
)
def get_new_record_counts(table_data, batch_size, batch_count, pg_connection_string):
    logger = prefect.context.get("logger")

    # extract table name & last updated timestamp
    document_id, table = table_data
    table["document_id"] = document_id

    if not table["last_updated"]:
        table["last_updated"] = pendulum.now().subtract(years=20).to_datetime_string()

    time_sort_field = table_time_sort_field(table["table"])

    # get count of new records
    result = pd.read_sql(
        con=pg_connection_string,
        sql=f"""
        SELECT COUNT(*)
        FROM {table["schema"]}."{table["table"]}"
        WHERE {time_sort_field} > '{table["last_updated"]}';
        """,
    )

    count = result["count"][0]
    <http://logger.info|logger.info>(f"{count} new records")

    # TODO take batch_count into account

    offsets = list(range(0, count, batch_size))
    table_offsets = []

    for offset in offsets:
        table_offset = table.copy()
        table_offset["offset"] = offset
        table_offsets.append(table_offset)

    <http://logger.info|logger.info>(table_offsets)
    # table_offsets = [{ doucument_id: "", table: "", schema: "", last_updated: "timestamp", offset: 0 }]
    return table_offsets
I then map over that list and then map over the result
oh! This is a mapped task
that’s mapping over a list of tables from firestore
t
So you get the offsets and then map a task that processes each chunk of the table.