Peter B
06/28/2020, 2:51 AMS3Upload/S3Download
task for uploading and downloading JSON at each stage. Then I realized that beyond the file size, S3 charges also happen per request (which I was making to check which files had been uploaded at each stage, then downloading files that were not yet processed). So rather than make potentially several thousand download requests to S3, I came up with a task that compresses a bunch of JSON serializable objects and uploads that compressed file to S3 instead. So it prevents the mapped S3Upload/S3Download
and a lot of requests (and saves a lot of $$$).
I'm happy with the workflow so no real problem at hand, but a few questions just out of curiosity:
1. Is there anything anyone has done similar to this?
2. Is there a Prefect task I'm missing that would handle this better?
3. Anyone have a better way of doing this?
Here's the task (which gets passed to an S3Upload
task next in the flow)
@task
def compress_json_serializable_objects(
json_serializable_objects: List[Dict[str, Any]],
object_names: List[str],
compression="xz",
):
if len(json_serializable_objects) != len(object_names):
raise ValueError(
f"json_serializable_objects (len={len(json_serializable_objects)})"
f"and object_names (len={len(object_names)}) not Equal"
)
with NamedTemporaryFile() as tmp:
with tarfile.open(tmp.name, f"w:{compression}") as tf:
for obj, name in zip(json_serializable_objects, object_names):
with closing(BytesIO(json.dumps(obj).encode())) as fobj:
tarinfo = tarfile.TarInfo(name)
tarinfo.size = len(fobj.getvalue())
tf.addfile(tarinfo, fileobj=fobj)
upload_data = Path(tmp.name).read_bytes()
return upload_data
Alex Cano
06/28/2020, 3:17 PM