Daniel Leicht
07/02/2025, 8:00 PMfrom prefect_aws import S3Bucket, MinIOCredentials
minio_creds = MinIOCredentials(
minio_root_user="user",
minio_root_password="password",
)
minio_creds.save("minio-creds2")
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
credentials=MinIOCredentials.load("minio-creds2"),
bucket_folder="pilot",
endpoint_url="http://minio-host:9000",
)
s3_bucket_block.save("minio-storage", overwrite=True)
my_flow.from_source(source=s3_bucket_block, entrypoint="prefect_flow.py:my_flow").deploy(
name="minio-to-rabbitmq",
work_pool_name="default-pool", # must already exist
)Marvin
07/02/2025, 8:00 PMMarvin
07/02/2025, 8:00 PMpython
from prefect_aws import S3Bucket, MinIOCredentials
# First create and save the MinIO credentials
minio_creds = MinIOCredentials(
minio_root_user="user",
minio_root_password="password",
)
minio_creds.save("minio-creds2")
# Create the S3Bucket block using AWS-style credentials
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user", # Use the same as minio_root_user
aws_secret_access_key="password", # Use the same as minio_root_password
bucket_folder="pilot",
endpoint_url="<http://minio-host:9000>"
)
s3_bucket_block.save("minio-storage", overwrite=True)
# Deploy the flow
my_flow.from_source(
source=s3_bucket_block,
entrypoint="prefect_flow.py:my_flow"
).deploy(
name="minio-to-rabbitmq",
work_pool_name="default-pool",
)
Key changes and additional considerations:
1. Instead of using the MinIOCredentials in the S3Bucket configuration, directly provide the credentials using aws_access_key_id
and aws_secret_access_key
2. Make sure your MinIO server has:
- The bucket "prefect-flows" already created
- The correct permissions set for the user
- The proper CORS configuration if needed
3. Verify that:
- The endpoint URL is accessible from where your code is running
- You're using the correct protocol (http vs https)
- The port number matches your MinIO configuration
If you're still having issues after making these changes, you might want to:
1. Verify the credentials work by trying to list buckets using the MinIO client directly
2. Check the MinIO server logs for more detailed error messages
3. Ensure there are no network connectivity issues between your application and the MinIO server
Let me know if you need help with any of these additional verification steps.Daniel Leicht
07/02/2025, 8:02 PMMarvin
07/02/2025, 8:03 PMMarvin
07/02/2025, 8:03 PMpython
from prefect_aws import S3Bucket
from prefect.blocks.system import Secret
# Create the S3Bucket block with explicit credentials
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user", # Use your MinIO user
aws_secret_access_key="password", # Use your MinIO password
bucket_folder="pilot",
endpoint_url="<http://minio-host:9000>",
# Add these important configurations
aws_region_name="us-east-1", # MinIO defaults to us-east-1
_block_document_name="minio-storage", # Optional: gives block a name during initialization
)
# Save the block
s3_bucket_block.save("minio-storage", overwrite=True)
# Deploy the flow
my_flow.from_source(
source=s3_bucket_block,
entrypoint="prefect_flow.py:my_flow"
).deploy(
name="minio-to-rabbitmq",
work_pool_name="default-pool",
)
A few important points:
1. We've removed the MinIOCredentials block entirely as it's not needed - S3Bucket can handle MinIO directly
2. Make sure to set these environment variables in your deployment environment:
bash
export AWS_ACCESS_KEY_ID="user"
export AWS_SECRET_ACCESS_KEY="password"
3. If you're using a self-signed certificate, you might need to add:
python
s3_bucket_block = S3Bucket(
# ... other parameters ...
client_kwargs={"verify": False} # Only if using self-signed certificates
)
4. Verify that:
- Your MinIO server is running and accessible
- The bucket "prefect-flows" exists in your MinIO instance
- The credentials have the correct permissions on the bucket
If you're still having issues, you can test the connection directly before trying to deploy:
python
# Test the connection after creating the block
try:
s3_bucket_block.list_objects()
print("Connection successful!")
except Exception as e:
print(f"Connection failed: {e}")
Let me know if you need any clarification or if you're still experiencing issues.Daniel Leicht
07/02/2025, 8:08 PMs3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user", # Use the same as minio_root_user
aws_secret_access_key="password", # Use the same as minio_root_password
bucket_folder="pilot",
client_kwargs={"verify": False},
endpoint_url="<http://minio-host:9000>",
# Add these important configurations
aws_region_name="us-east-1", # MinIO defaults to us-east-1
_block_document_name="minio-storage", # Optional: gives block a name during initialization
)
s3_bucket_block.save("minio-storage", overwrite=True)
s3_bucket_block.list_objects()
Marvin
07/02/2025, 8:08 PMMarvin
07/02/2025, 8:08 PMpython
from prefect_aws import S3Bucket
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user",
aws_secret_access_key="password",
bucket_folder="pilot",
client_kwargs={
"verify": False,
# Add endpoint configuration here
"endpoint_url": "<http://minio-host:9000>",
# Explicitly configure credentials
"aws_access_key_id": "user",
"aws_secret_access_key": "password"
},
aws_region_name="us-east-1",
_block_document_name="minio-storage"
)
# Save the block
s3_bucket_block.save("minio-storage", overwrite=True)
# Test the connection
s3_bucket_block.list_objects()
Key changes:
1. Moved the endpoint_url into client_kwargs
2. Added credentials directly in client_kwargs
3. Removed the angle brackets from the URL
4. Kept the credentials at both levels (both in S3Bucket constructor and client_kwargs) to ensure they're properly passed through
If you're still getting the credentials error, try this alternative configuration:
python
from prefect_aws import S3Bucket
import boto3
# Create a boto3 session with explicit credentials
session = boto3.Session(
aws_access_key_id="user",
aws_secret_access_key="password",
region_name="us-east-1"
)
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
bucket_folder="pilot",
client_kwargs={
"verify": False,
"endpoint_url": "<http://minio-host:9000>"
},
aws_region_name="us-east-1",
_block_document_name="minio-storage",
boto3_session=session # Use the explicit session
)
# Save the block
s3_bucket_block.save("minio-storage", overwrite=True)
# Test the connection
s3_bucket_block.list_objects()
Also, verify these points:
1. Make sure you're using the correct credentials (the actual MinIO root user and password)
2. Confirm that the MinIO server is accessible at the specified endpoint
3. Ensure the "prefect-flows" bucket exists in your MinIO instance
4. Try accessing MinIO using the MinIO client (mc) to verify the credentials work:
bash
mc config host add myminio <http://minio-host:9000> user password
mc ls myminio/prefect-flows
Let me know if you're still experiencing issues after trying these solutions.Daniel Leicht
07/02/2025, 8:29 PMMarvin
07/02/2025, 8:30 PMMarvin
07/02/2025, 8:30 PMpython
from prefect import flow
from prefect_aws import S3Bucket
# First, create and save your S3 storage block
s3_bucket_block = S3Bucket(
bucket_name="your-bucket",
aws_access_key_id="your-access-key", # Optional if using IAM roles
aws_secret_access_key="your-secret-key", # Optional if using IAM roles
bucket_folder="flows", # Optional subfolder
)
s3_bucket_block.save("my-s3-block", overwrite=True)
# Your flow definition
@flow
def my_flow(param1: str = "default"):
print(f"Running with {param1}")
# Deploy the flow
my_flow.from_source(
source=s3_bucket_block,
entrypoint="path/to/flow_file.py:my_flow" # Relative to repo root
).deploy(
name="s3-deployed-flow",
work_pool_name="your-work-pool", # Must exist already
schedule=None # Optional: Add schedule if needed
)
Key points to remember:
1. File Structure: Your flow code must be in a Python file, and the entrypoint should point to it using the format path/to/file.py:flow_name
2. Dependencies: Make sure your requirements.txt
or setup.py
is in the root of your project to ensure workers can install needed packages
3. Work Pool Configuration: The work pool must be created before deployment and configured properly:
bash
prefect work-pool create 'your-work-pool' --type process
4. Common Issues to Avoid:
- Ensure all imports in your flow file are available to workers
- Keep your flow file as self-contained as possible
- Use relative imports if your flow is part of a package
5. Testing Deployment:
python
# Test that the flow was deployed successfully
from prefect.deployments import load_deployments_from_yaml
deployment = load_deployments_from_yaml("prefect.yaml")[0]
# Run the deployment
deployment.run()
6. Running the Worker:
bash
prefect worker start -p 'your-work-pool'
If you're using AWS S3 (not MinIO), you can simplify the configuration by using IAM roles and environment variables:
python
# Using AWS S3 with IAM roles
s3_bucket_block = S3Bucket(
bucket_name="your-bucket",
bucket_folder="flows"
)
s3_bucket_block.save("my-s3-block", overwrite=True)
For MinIO specifically, make sure to include the endpoint URL:
python
s3_bucket_block = S3Bucket(
bucket_name="your-bucket",
aws_access_key_id="your-access-key",
aws_secret_access_key="your-secret-key",
bucket_folder="flows",
client_kwargs={
"endpoint_url": "<http://your-minio-host:9000>",
"verify": False # If using self-signed certs
}
)
Remember that your workers need:
1. Access to the S3 bucket
2. The same block configuration as your deployment environment
3. All required dependencies installed
4. Network access to both S3 and your Prefect API
Let me know if you need clarification on any of these points!Daniel Leicht
07/02/2025, 8:32 PMMarvin
07/02/2025, 8:32 PMMarvin
07/02/2025, 8:32 PMpython
from prefect import flow
from prefect_aws import S3Bucket
import os
# Create and save the S3/MinIO block
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user",
aws_secret_access_key="password",
bucket_folder="flows",
client_kwargs={
"endpoint_url": "<http://minio-host:9000>",
"verify": False
}
)
s3_bucket_block.save("minio-storage", overwrite=True)
@flow
def my_flow():
print("Hello from flow!")
# IMPORTANT: First, explicitly upload the flow code to MinIO
current_file = os.path.abspath(__file__) # Gets the current file's path
s3_bucket_block.upload_from_path(
from_path=current_file,
to_path="flow_file.py" # The name it will have in MinIO
)
# Now deploy using the uploaded file
my_flow.from_source(
source=s3_bucket_block,
entrypoint="flow_file.py:my_flow" # Match the uploaded filename
).deploy(
name="minio-flow",
work_pool_name="default-pool"
)
# Verify the file exists in MinIO
try:
files = s3_bucket_block.list_objects()
print("Files in bucket:", files)
except Exception as e:
print(f"Error listing files: {e}")
A few important checks to make:
1. Verify the upload worked:
python
# Test uploading and listing files
test_content = "test content"
with open("test.txt", "w") as f:
f.write(test_content)
# Try uploading a test file
s3_bucket_block.upload_from_path(
from_path="test.txt",
to_path="test.txt"
)
# List contents
print(s3_bucket_block.list_objects())
2. If you're working with a more complex project structure, you might need to upload multiple files:
python
def upload_directory(local_path, s3_block, prefix=""):
for root, dirs, files in os.walk(local_path):
for file in files:
if file.endswith('.py'): # or adjust pattern as needed
local_file_path = os.path.join(root, file)
relative_path = os.path.relpath(local_file_path, local_path)
s3_path = os.path.join(prefix, relative_path).replace("\\", "/")
s3_block.upload_from_path(
from_path=local_file_path,
to_path=s3_path
)
print(f"Uploaded {local_file_path} to {s3_path}")
# Usage:
upload_directory("./your_project_directory", s3_bucket_block)
3. For debugging, you can directly check MinIO using the MinIO Client (mc):
bash
# Configure MinIO client
mc alias set myminio <http://minio-host:9000> user password
# List contents of your bucket
mc ls myminio/prefect-flows/flows/
# Check specific file
mc cat myminio/prefect-flows/flows/flow_file.py
4. If you're still having issues, try this more verbose configuration:
```python
s3_bucket_block = S3Bucket(
bucket_name="prefect-flows",
aws_access_key_id="user",
aws_secret_access_key="password",
bucket_folder="flows",
client_kwargs={
"endpoint_url": "http://minio-host:9000",
"verify": False,
"aws_access_key_id": "user",
"aws_secret_access_key": "password",
"config": boto3.