t

    tom

    1 year ago
    Hi! I have a question regarding processing a large amount of objects. I’d have a task that keeps yielding object IDs, and those objects should be processed by a per-object task (in parallel). Since there are many objects to be processed (potentially 1MM), object processing should start while object IDs are still being fetched: I wouldn’t want to have an in-memory list of all the object IDs. See below for pseudo code. Is it possible to model this with prefect? If so, how? Thanks for your help!
    def fetch_object_ids():
      # Long running task with many object_ids returning.
      cursor = None
      while True:
        # Keep fetching and yielding object IDs until we're exhausted.
        obj_ids, cursor = external_request_to_fetch_object_ids(cursor)
        for obj_id in obj_ids:
          yield obj_id
        if cursor is None:
          break
    
    def process_object(object_id):
      do_something_with_object(object_id)
    
    with Flow("Process Objects" as flow):
      object_ids = fetch_object_ids()  # I don't want to wait here / keep this list in memory
      process_object.map(object_ids)
      send_finished_email_to_user()  # Should depend on all objects processed.
    Dylan

    Dylan

    1 year ago
    Hey @tom! This would be a pretty cool feature and I’ve passed your feedback along to the team
    In the meantime, I’m not sure there’s a way to start processing downstream dependencies before upstream ones complete
    t

    tom

    1 year ago
    Thanks! How would you suggest modeling this process meanwhile? Is there some other way it could be batched?
    Dylan

    Dylan

    1 year ago
    That takes a deep dive
    t

    tom

    1 year ago
    Yep, I saw that.
    Dylan

    Dylan

    1 year ago
    If it’s possible to know how many records there are ahead of time you can generate a list of configuration objects and map over that
    Which can take advantage of DFE
    t

    tom

    1 year ago
    I might have the count, or at least an approximate count. What do you mean by configuration objects?
    Dylan

    Dylan

    1 year ago
    I actually have a flow that does this
    one sec
    @task(
        name="Get New Record Counts",
        checkpoint=True,
        max_retries=5,
        retry_delay=timedelta(minutes=3),
        tags=["prefect_cloud_database"],
    )
    def get_new_record_counts(table_data, batch_size, batch_count, pg_connection_string):
        logger = prefect.context.get("logger")
    
        # extract table name & last updated timestamp
        document_id, table = table_data
        table["document_id"] = document_id
    
        if not table["last_updated"]:
            table["last_updated"] = pendulum.now().subtract(years=20).to_datetime_string()
    
        time_sort_field = table_time_sort_field(table["table"])
    
        # get count of new records
        result = pd.read_sql(
            con=pg_connection_string,
            sql=f"""
            SELECT COUNT(*)
            FROM {table["schema"]}."{table["table"]}"
            WHERE {time_sort_field} > '{table["last_updated"]}';
            """,
        )
    
        count = result["count"][0]
        <http://logger.info|logger.info>(f"{count} new records")
    
        # TODO take batch_count into account
    
        offsets = list(range(0, count, batch_size))
        table_offsets = []
    
        for offset in offsets:
            table_offset = table.copy()
            table_offset["offset"] = offset
            table_offsets.append(table_offset)
    
        <http://logger.info|logger.info>(table_offsets)
        # table_offsets = [{ doucument_id: "", table: "", schema: "", last_updated: "timestamp", offset: 0 }]
        return table_offsets
    I then map over that list and then map over the result
    oh! This is a mapped task
    that’s mapping over a list of tables from firestore
    t

    tom

    1 year ago
    So you get the offsets and then map a task that processes each chunk of the table.