Marc Lipoff
09/08/2022, 6:30 PMlast_modified_begin, last_modified_end, partition = make_chunks(
target_start_time,
to_timedelta(poll_freq),
to_timedelta(total_duration_to_run),
)
# lists the files in the chunk (based on modified timestamp)
files = S3ListGracefulAndWait(bucket=s3_bucket_name.run()).map(
prefix=unmapped("fargate"),
last_modified_begin=last_modified_begin,
last_modified_end=last_modified_end,
)
df = read_files.map(files)
### and then take the dataframe and push to database, ...
class S3ListGracefulAndWait(S3List):
def run(
self,
prefix: str,
partition: str,
last_modified_begin: datetime.datetime,
last_modified_end: datetime.datetime,
) -> list[str]:
# using the partitions is important because of the number of files. without at least pre-filtering on dt, the s3list takes way too long
prefix += "/dt=" + partition
if last_modified_end < now_(): # has passed
try:
log().info("Starting to list s3 files...")
res = super().run(
prefix=prefix,
last_modified_begin=datetime_to_string(last_modified_begin),
last_modified_end=datetime_to_string(last_modified_end),
)
log().info(
f"S3ListGracefulAndWait run prefix={prefix} last_modified_begin={last_modified_begin} last_modified_end={last_modified_end}. Result={res}"
)
if len(res) == 0:
raise signals.SKIP(
f"No files available for dt={partition} {last_modified_begin} to {last_modified_end}"
)
return [f"s3://{self.bucket}/{x}" for x in res]
except Exception as e:
log().error(e, exc_info=True)
raise signals.SKIP(
f"Failed to get s3 files for dt={partition} {last_modified_begin} to {last_modified_end}. {e}"
)
else:
raise signals.RETRY(
message=f"Going to retry at {last_modified_end}",
start_time=last_modified_end,
)
Anna Geller
09/09/2022, 1:43 AM