<@ULVA73B9P> if I need to pass a PIL image between...
# ask-marvin
g
@Marvin if I need to pass a PIL image between tasks in a flow, is there a way to create a custom serializer? Are there any limits on the size of assets passed between tasks? I am using prefect cloud
m
thought for 158.6 seconds
Short answer - Yes, you can pass a
PIL.Image
between tasks. If everything runs in a single worker/process, you can just return the object. - If results need to cross process boundaries (e.g., retries, distributed task runners, multiple workers) or be persisted, define a serializer or use the built-in pickle serializer and persist results to remote storage. - Size limits: Prefect Cloud does not move large results through the API when you persist them; only a small reference is stored. Without persistence, large results can hit API/DB limits — keep them small or persist them. Details and examples (Prefect 3.x) Option A: keep it simple (single worker/process) - Just return the
PIL.Image.Image
from one task and accept it in the next. No extra config needed. This won’t scale if your tasks are distributed or if you need retries that reload results. Option B: persist results with a proper serializer (recommended) - Either use
PickleSerializer()
(works out of the box for
PIL.Image
) or create a custom serializer that stores images as PNG/JPEG bytes. Persist results to S3/GCS/Azure to avoid size limits and make retries/distribution safe. Custom serializer example (PNG)
Copy code
from io import BytesIO
from PIL import Image
from prefect import task, flow
from prefect.serializers import Serializer
from prefect_aws.s3 import S3Bucket  # or your preferred storage block


class PILPNGSerializer(Serializer):
    type = "pil-png"

    def dumps(self, obj):
        if not isinstance(obj, Image.Image):
            raise TypeError("Expected PIL.Image.Image")
        buf = BytesIO()
        obj.save(buf, format="PNG")
        return buf.getvalue()  # bytes

    def loads(self, data):
        return Image.open(BytesIO(data))


# Load a storage block you’ve configured in Prefect (Cloud or local)
s3 = S3Bucket.load("my-results-bucket")  # make sure this block exists

pil_serializer = PILPNGSerializer()

@task(persist_result=True, result_storage=s3, result_serializer=pil_serializer)
def make_image() -> Image.Image:
    # ... create a PIL image ...
    img = Image.new("RGB", (512, 512), color="magenta")
    return img

@task(persist_result=True, result_storage=s3, result_serializer=pil_serializer)
def consume_image(img: Image.Image) -> int:
    # Do whatever you need; return something small if possible
    return img.size[0] * img.size[1]

@flow
def pipeline():
    img = make_image()
    pixels = consume_image(img)
    return pixels
Notes on size limits - With persistence: your image bytes go to object storage (S3/GCS/etc.) and Prefect Cloud stores a small reference. Practical limit is whatever your storage can handle. - Without persistence: Prefect attempts to include results in state. Large binary objects can hit API/DB limits and cause warnings or failures. As a rule of thumb, don’t rely on embedding large results in state; persist them. - Best practice for very large assets: write to storage yourself and pass a URI/key between tasks. It’s faster, lighter on memory, and avoids moving big blobs around. If you share: - whether you’re running a single worker vs. distributed, - expected image sizes, - which storage (S3/GCS/Azure/local) you prefer, I’ll tailor the example and configuration so you don’t have to guess.