Gabor Hosszu
07/10/2025, 10:29 AMfrom prefect.deployments import run_deployment
class ComplexObj:
def __init__(self):
...
def method1(self):
...
etc
my_object = ComplexObj()
my_flow_run = run_deployment(
name="my-flow"
parameters={
"some_str": "just_a_string",
"some_int": 25,
"some_complex_obj": my_object,
},
timeout=0,
)
Let's assume "my-flow" is implemented in an entirely different codebase and can accept anything as parameter for "some_complex_obj". Would this work?Marvin
07/10/2025, 10:29 AMMarvin
07/10/2025, 10:30 AMComplexObj
) directly to a remote deployment would not work out of the box. Here's why and what you can do about it:
1. Serialization Limitations:
- Prefect needs to serialize parameters to send them across the network
- By default, Prefect uses JSON serialization which can only handle basic Python types (strings, numbers, lists, dictionaries, etc.)
- Custom objects like your ComplexObj
cannot be automatically serialized to JSON
2. Solutions:
a) Convert to Serializable Format:
python
# Convert your complex object to a dictionary before passing
my_object_dict = {
"attribute1": my_object.attribute1,
"attribute2": my_object.attribute2,
}
my_flow_run = run_deployment(
name="my-flow",
parameters={
"some_str": "just_a_string",
"some_int": 25,
"some_complex_obj": my_object_dict, # Pass the dictionary instead
},
timeout=0,
)
b) Custom Serialization:
python
from prefect.serializers import Serializer
class ComplexObjSerializer(Serializer):
def dumps(self, obj):
# Convert your object to bytes
return your_serialization_logic(obj)
def loads(self, blob):
# Convert bytes back to your object
return your_deserialization_logic(blob)
3. Best Practices:
- Keep deployment parameters simple and serializable
- If you need to pass complex objects, consider:
- Breaking them down into basic types
- Passing identifiers/references and reconstructing the objects in the flow
- Using a shared storage system (database, object store) to store complex objects
4. Important Considerations:
- The receiving flow must have access to the ComplexObj
class definition
- Any dependencies required by your complex object must be available in the flow's environment
- Consider versioning implications if the object structure changes
Remember: The simpler your parameter structure, the more reliable your deployments will be. Consider whether you really need to pass the entire complex object, or if you can pass just the essential data in a simpler format.Gabor Hosszu
07/10/2025, 10:34 AMMarvin
07/10/2025, 10:35 AMMarvin
07/10/2025, 10:36 AMpython
from prefect_aws import S3Bucket
from prefect.blocks.system import JSON
# Create and save your storage block
s3_block = S3Bucket(
bucket_name="your-bucket",
credentials={
"aws_access_key_id": "your-key",
"aws_secret_access_key": "your-secret"
}
)
s3_block.save("my-storage")
2. Modify your flow to use object storage:
python
import pickle
from prefect import flow, task
from prefect_aws import S3Bucket
@task
def store_complex_object(obj):
# Serialize your object
serialized_obj = pickle.dumps(obj)
# Get the storage block
s3_block = S3Bucket.load("my-storage")
# Generate a unique key for this object
object_key = f"complex_objects/{obj.__class__.__name__}_{id(obj)}.pkl"
# Store the object
s3_block.write_path(object_key, serialized_obj)
return object_key
@task
def retrieve_complex_object(object_key):
# Get the storage block
s3_block = S3Bucket.load("my-storage")
# Retrieve the serialized object
serialized_obj = s3_block.read_path(object_key)
# Deserialize and return
return pickle.loads(serialized_obj)
@flow
def workflow_with_complex_object():
# Create your complex object
my_object = ComplexObj()
# Store it and get the reference
object_key = store_complex_object(my_object)
# Now you can pass the key to your remote deployment
remote_flow_run = run_deployment(
name="my-flow",
parameters={
"some_str": "just_a_string",
"some_int": 25,
"complex_obj_key": object_key, # Pass the storage key instead of the object
},
timeout=0,
)
3. In your remote flow, retrieve the object:
python
@flow
def my_flow(some_str: str, some_int: int, complex_obj_key: str):
# Retrieve the complex object
complex_obj = retrieve_complex_object(complex_obj_key)
# Now you can use the object
complex_obj.method1()
# ... rest of your flow logic
Important Considerations:
1. Security:
- Ensure proper access controls on your S3 bucket
- Consider encryption at rest for sensitive data
- Use appropriate IAM roles/permissions
2. Performance:
- Large objects might impact flow execution time
- Consider cleanup tasks for temporary objects
- Use appropriate S3 storage classes based on access patterns
3. Reliability:
- Add error handling for storage/retrieval operations
- Consider object expiration policies
- Implement retry logic for network operations
4. Cost:
- Monitor storage usage and implement cleanup
- Use appropriate S3 storage classes
- Consider implementing TTL (Time To Live) for temporary objects
This approach provides several benefits:
- Handles objects of any size or complexity
- Maintains separation of concerns
- Provides persistence if needed
- Works across different execution environments
- Allows for versioning and tracking of objects
Remember to handle cleanup of stored objects when they're no longer needed, either through explicit deletion tasks or S3 lifecycle policies.