https://prefect.io logo
Title
n

Nasr Mohamed

05/15/2023, 3:49 PM
Hey guys, I have a streaming pipeline that I built based on this git repo: https://github.com/anna-geller/prefect-streaming I'm running it on a fairly large EC2 instance (2xlarge) and after running for a couple of hours; the instance is dying due to out of memory issues. The task is primarily setup and defined as such:
@task
def fetch_last_processed_fields(s3_bucket, timestamp_folder, table_name):
    try:
        obj = s3.get_object(
            Bucket=s3_bucket, Key=f"{timestamp_folder}/{table_name}_timestamp.json"
        )
        last_processed_fields = json.loads(obj["Body"].read().decode("utf-8"))
        return last_processed_fields
    except s3.exceptions.NoSuchKey:
        return None


@task
def load_query_into_df(query: str, last_processed_fields: dict, limit: int):
    df = pd.read_sql_query(
        query, con=engine, params={**last_processed_fields, "limit": limit}
    )
    return df


@task
def fetch_latest_results_from_data_frame(df: pd.DataFrame, keys: list[str]):
    last_row = df[keys].iloc[-1]
    return last_row.to_dict()

@task
def update_last_processed_fields(
    s3_bucket: str,
    timestamp_folder: str,
    table_name: str,
    new_last_processed_fields: dict,
):
    s3.put_object(
        Body=json.dumps(new_last_processed_fields, default=JSONSerialize),
        Bucket=s3_bucket,
        Key=f"{timestamp_folder}/{table_name}_timestamp.json",
    )


@task
def load_parquet_into_s3(
    df: pd.DataFrame,
    s3_destination_folder: str,
    database: str,
    table_name: str,
    column_types: dict,
):
    wr.s3.to_parquet(
        df=df,
        path=f"{s3_destination_folder}/{table_name}/",
        dataset=True,
        mode="append",
        database=database,
        table=table_name,
        dtype=column_types,
    )
    del df


@flow
def stream_from_pg_s3(
    s3_bucket: str,
    timestamp_folder: str,
    table_name: str,
    s3_destination_folder: str,
    database: str,
    query: str,
    column_types: dict,
    starting_point: dict,
    limit=20000,
    has_index=False,
    fetch_index_query=None,
):
    last_processed_fields = fetch_last_processed_fields(
        s3_bucket, timestamp_folder, table_name
    )

    df = load_query_into_df(query, last_processed_fields, limit)

    new_last_processed_fields = fetch_latest_results_from_data_frame(
        df=df,
        keys=last_processed_fields.keys()
    )

    load_parquet_into_s3(df, s3_destination_folder, database, table_name, column_types)

    update_last_processed_fields(
        s3_bucket, timestamp_folder, table_name, new_last_processed_fields
    )
Here is how I'm calling the flow:
from flows.bulk_stream_pg_data import stream_from_pg_s3
import gc

if __name__ == "__main__":
    while True:
        stream_from_pg_s3(
            s3_bucket="opendata-internal",           timestamp_folder="v2/timestamps",
            table_name="transfers",
            s3_destination_folder="<s3://opendata-internal/v2/data>",
            database="transfers",
            query="""
                SELECT * from transfers
                WHERE index_id > %(index_id)s
                LIMIT %(limit)s
            """,
            starting_point={"index_id": 0},
            limit=30000,
        )

        gc.collect()
b

Bianca Hoch

05/15/2023, 7:03 PM
Hey Nasr! I believe one way to go about this would be to adopt an orchestrator/worker pattern. Essentially the orchestrator flow will be responsible for partitioning the ingested data into smaller batches, and then deploying worker flows that are responsible for processing the batched data. Each worker flow would then be run in a separate container, as opposed to having everything run in the same container. Here's an example of how to achieve this pattern.