Hey all - I'm using Prefect for a side project to ...
# show-us-what-you-got
p
Hey all - I'm using Prefect for a side project to learn it and had a question. I've got a workflow that handles several thousand JSON-serializable objects that I need to put on S3, then process in a later step. Originally I was using a mapped
S3Upload/S3Download
task for uploading and downloading JSON at each stage. Then I realized that beyond the file size, S3 charges also happen per request (which I was making to check which files had been uploaded at each stage, then downloading files that were not yet processed). So rather than make potentially several thousand download requests to S3, I came up with a task that compresses a bunch of JSON serializable objects and uploads that compressed file to S3 instead. So it prevents the mapped
S3Upload/S3Download
and a lot of requests (and saves a lot of $$$). I'm happy with the workflow so no real problem at hand, but a few questions just out of curiosity: 1. Is there anything anyone has done similar to this? 2. Is there a Prefect task I'm missing that would handle this better? 3. Anyone have a better way of doing this? Here's the task (which gets passed to an
S3Upload
task next in the flow)
Copy code
@task
def compress_json_serializable_objects(
    json_serializable_objects: List[Dict[str, Any]],
    object_names: List[str],
    compression="xz",
):
    if len(json_serializable_objects) != len(object_names):
        raise ValueError(
            f"json_serializable_objects (len={len(json_serializable_objects)})"
            f"and object_names (len={len(object_names)}) not Equal"
        )
    with NamedTemporaryFile() as tmp:
        with tarfile.open(tmp.name, f"w:{compression}") as tf:
            for obj, name in zip(json_serializable_objects, object_names):
                with closing(BytesIO(json.dumps(obj).encode())) as fobj:
                    tarinfo = tarfile.TarInfo(name)
                    tarinfo.size = len(fobj.getvalue())
                    tf.addfile(tarinfo, fileobj=fobj)
        upload_data = Path(tmp.name).read_bytes()
    return upload_data
a
Specifically on the “has anyone done something similar to this” question, I definitely have in a previous role managing a data lake. I originally tried to break up my file sizes into blocks of 100k records, but quickly realized that I was running into your exact issue of number of requests & tiny file sizes. I quickly switched that from targeting 100k records to targeting a compressed file size of 500MB per file. I found this was a good sweet spot for the data volume I was dealing with, and still got decent performance after sending it over the wire. There isn’t currently any Prefect task to handle exactly what you’re trying to accomplish (compressing data and saving that), but it does remind me of the Results, specifically this portion (https://docs.prefect.io/core/concepts/results.html#persisting-user-created-results). AFAIK there isn’t an interface or option to compress data prior to saving as a result directly, but you can probably accomplish what you’re looking for by directly hooking into that API. In addition, it might be / feel hacky, but you could choose to implement the compression / decompression using a custom serializer (https://docs.prefect.io/core/concepts/results.html#choose-a-serializer). I haven’t used this myself, so mileage may vary!