gee
08/14/2025, 4:35 PMMarvin
08/14/2025, 4:35 PMMarvin
08/14/2025, 4:37 PMPIL.Image
between tasks. If everything runs in a single worker/process, you can just return the object.
- If results need to cross process boundaries (e.g., retries, distributed task runners, multiple workers) or be persisted, define a serializer or use the built-in pickle serializer and persist results to remote storage.
- Size limits: Prefect Cloud does not move large results through the API when you persist them; only a small reference is stored. Without persistence, large results can hit API/DB limits — keep them small or persist them.
Details and examples (Prefect 3.x)
Option A: keep it simple (single worker/process)
- Just return the PIL.Image.Image
from one task and accept it in the next. No extra config needed. This won’t scale if your tasks are distributed or if you need retries that reload results.
Option B: persist results with a proper serializer (recommended)
- Either use PickleSerializer()
(works out of the box for PIL.Image
) or create a custom serializer that stores images as PNG/JPEG bytes. Persist results to S3/GCS/Azure to avoid size limits and make retries/distribution safe.
Custom serializer example (PNG)
from io import BytesIO
from PIL import Image
from prefect import task, flow
from prefect.serializers import Serializer
from prefect_aws.s3 import S3Bucket # or your preferred storage block
class PILPNGSerializer(Serializer):
type = "pil-png"
def dumps(self, obj):
if not isinstance(obj, Image.Image):
raise TypeError("Expected PIL.Image.Image")
buf = BytesIO()
obj.save(buf, format="PNG")
return buf.getvalue() # bytes
def loads(self, data):
return Image.open(BytesIO(data))
# Load a storage block you’ve configured in Prefect (Cloud or local)
s3 = S3Bucket.load("my-results-bucket") # make sure this block exists
pil_serializer = PILPNGSerializer()
@task(persist_result=True, result_storage=s3, result_serializer=pil_serializer)
def make_image() -> Image.Image:
# ... create a PIL image ...
img = Image.new("RGB", (512, 512), color="magenta")
return img
@task(persist_result=True, result_storage=s3, result_serializer=pil_serializer)
def consume_image(img: Image.Image) -> int:
# Do whatever you need; return something small if possible
return img.size[0] * img.size[1]
@flow
def pipeline():
img = make_image()
pixels = consume_image(img)
return pixels
Notes on size limits
- With persistence: your image bytes go to object storage (S3/GCS/etc.) and Prefect Cloud stores a small reference. Practical limit is whatever your storage can handle.
- Without persistence: Prefect attempts to include results in state. Large binary objects can hit API/DB limits and cause warnings or failures. As a rule of thumb, don’t rely on embedding large results in state; persist them.
- Best practice for very large assets: write to storage yourself and pass a URI/key between tasks. It’s faster, lighter on memory, and avoids moving big blobs around.
If you share:
- whether you’re running a single worker vs. distributed,
- expected image sizes,
- which storage (S3/GCS/Azure/local) you prefer,
I’ll tailor the example and configuration so you don’t have to guess.