Nasr Mohamed
05/15/2023, 3:49 PM@task
def fetch_last_processed_fields(s3_bucket, timestamp_folder, table_name):
try:
obj = s3.get_object(
Bucket=s3_bucket, Key=f"{timestamp_folder}/{table_name}_timestamp.json"
)
last_processed_fields = json.loads(obj["Body"].read().decode("utf-8"))
return last_processed_fields
except s3.exceptions.NoSuchKey:
return None
@task
def load_query_into_df(query: str, last_processed_fields: dict, limit: int):
df = pd.read_sql_query(
query, con=engine, params={**last_processed_fields, "limit": limit}
)
return df
@task
def fetch_latest_results_from_data_frame(df: pd.DataFrame, keys: list[str]):
last_row = df[keys].iloc[-1]
return last_row.to_dict()
@task
def update_last_processed_fields(
s3_bucket: str,
timestamp_folder: str,
table_name: str,
new_last_processed_fields: dict,
):
s3.put_object(
Body=json.dumps(new_last_processed_fields, default=JSONSerialize),
Bucket=s3_bucket,
Key=f"{timestamp_folder}/{table_name}_timestamp.json",
)
@task
def load_parquet_into_s3(
df: pd.DataFrame,
s3_destination_folder: str,
database: str,
table_name: str,
column_types: dict,
):
wr.s3.to_parquet(
df=df,
path=f"{s3_destination_folder}/{table_name}/",
dataset=True,
mode="append",
database=database,
table=table_name,
dtype=column_types,
)
del df
@flow
def stream_from_pg_s3(
s3_bucket: str,
timestamp_folder: str,
table_name: str,
s3_destination_folder: str,
database: str,
query: str,
column_types: dict,
starting_point: dict,
limit=20000,
has_index=False,
fetch_index_query=None,
):
last_processed_fields = fetch_last_processed_fields(
s3_bucket, timestamp_folder, table_name
)
df = load_query_into_df(query, last_processed_fields, limit)
new_last_processed_fields = fetch_latest_results_from_data_frame(
df=df,
keys=last_processed_fields.keys()
)
load_parquet_into_s3(df, s3_destination_folder, database, table_name, column_types)
update_last_processed_fields(
s3_bucket, timestamp_folder, table_name, new_last_processed_fields
)
from flows.bulk_stream_pg_data import stream_from_pg_s3
import gc
if __name__ == "__main__":
while True:
stream_from_pg_s3(
s3_bucket="opendata-internal", timestamp_folder="v2/timestamps",
table_name="transfers",
s3_destination_folder="<s3://opendata-internal/v2/data>",
database="transfers",
query="""
SELECT * from transfers
WHERE index_id > %(index_id)s
LIMIT %(limit)s
""",
starting_point={"index_id": 0},
limit=30000,
)
gc.collect()
Bianca Hoch
05/15/2023, 7:03 PM