Fernando Müller
07/08/2025, 12:55 PMMarvin
07/08/2025, 12:55 PMMarvin
07/08/2025, 12:55 PMpython
from prefect import flow
from prefect.schedules import CronSchedule
@flow
def my_parameterized_flow(value: str = "default"):
print(f"Running with value: {value}")
if __name__ == "__main__":
# Create two different schedules with different parameters
schedules = [
# Run daily at 9 AM with value="morning"
CronSchedule(
cron="0 9 * * *",
params={"value": "morning"}
),
# Run daily at 5 PM with value="evening"
CronSchedule(
cron="0 17 * * *",
params={"value": "evening"}
)
]
# Deploy the flow with multiple schedules
my_parameterized_flow.deploy(
name="multi-schedule-deployment",
schedules=schedules
)
You can also define this in your prefect.yaml
file if you prefer a declarative approach:
yaml
deployments:
- name: "multi-schedule-deployment"
schedules:
- cron: "0 9 * * *"
params:
value: "morning"
- cron: "0 17 * * *"
params:
value: "evening"
Each schedule will create flow runs with its specific parameter values at the scheduled time. You can use any of Prefect's schedule types (Cron, Interval, RRule) and combine them as needed.
For more details on schedules and deployment configuration, you can check out:
- Prefect Schedules Documentation
- How to Create Schedules GuideFernando Müller
07/08/2025, 12:56 PMMarvin
07/08/2025, 12:57 PMMarvin
07/08/2025, 12:57 PMFernando Müller
07/08/2025, 1:07 PMMarvin
07/08/2025, 1:07 PMMarvin
07/08/2025, 1:09 PMprefect.yaml
file:
yaml
deployments:
- name: "my-deployment"
entrypoint: "path/to/your/flow.py:flow_function"
parameters:
parameter_name: "parameter_value"
another_parameter: 42
complex_parameter:
nested_key: "value"
list_example: [1, 2, 3]
# You can also specify parameters per schedule as we discussed earlier
schedules:
- cron: "0 9 * * *"
parameters:
parameter_name: "morning_value"
- cron: "0 17 * * *"
parameters:
parameter_name: "evening_value"
For example, if you have a flow like this:
python
@flow
def my_flow(name: str = "default", count: int = 1, config: dict = None):
print(f"Hello {name}, count is {count}")
if config:
print(f"Config: {config}")
You could configure it in your prefect.yaml
like this:
yaml
deployments:
- name: "example-deployment"
entrypoint: "flows.py:my_flow"
parameters:
name: "Alice"
count: 5
config:
setting1: "value1"
setting2: "value2"
When you deploy this configuration using prefect deploy
, these parameters will be used as the default values for your deployment. You can still override these parameters when starting a flow run manually through the UI or CLI.
Remember that:
1. Parameters defined in the YAML must match the parameter names in your flow function
2. The values must be valid YAML and match the expected types of your flow parameters
3. These parameters serve as defaults and can be overridden when starting a flow run
For more information about deployment configuration, you can refer to the Prefect YAML documentation.Fernando Müller
07/08/2025, 1:12 PMMarvin
07/08/2025, 1:12 PMMarvin
07/08/2025, 1:13 PMpython
from prefect import flow
@flow
def my_flow(required_param: str, optional_param: int = 42):
print(f"Required: {required_param}, Optional: {optional_param}")
In this case, required_param
must be provided when running the flow, while optional_param
has a default value of 42.
When defining this in your prefect.yaml
, you have a few options:
yaml
deployments:
- name: "my-deployment"
entrypoint: "flow.py:my_flow"
parameters:
# Option 1: Provide a default value in the deployment
required_param: "default-value"
optional_param: 100
- name: "my-deployment-no-defaults"
entrypoint: "flow.py:my_flow"
# Option 2: Don't specify parameters at all - they must be provided at runtime
parameters: {}
- name: "my-deployment-partial"
entrypoint: "flow.py:my_flow"
# Option 3: Only specify some parameters
parameters:
optional_param: 200
# required_param must still be provided at runtime
Important notes:
1. If a parameter is required in the flow (no default value) and not specified in the deployment's parameters, it must be provided when starting the flow run
2. You can provide required parameters:
- Through the UI when starting a flow run
- Via the CLI using prefect deployment run deployment/name -p required_param=value
- Through the API when programmatically creating flow runs
3. If you try to run a flow without providing all required parameters, Prefect will raise an error
This flexibility allows you to:
- Make parameters truly required at runtime
- Provide defaults at the deployment level
- Override any parameters (required or optional) when starting a flow runFernando Müller
07/11/2025, 1:03 AMMarvin
07/11/2025, 1:03 AMMarvin
07/11/2025, 1:04 AMif __name__ == "__main__":
pattern with Prefect flows! It's a common and recommended practice, especially when you want to test your flows locally or provide a way to run them directly from the script.
Here's how you can pass parameters to your flow:
python
from prefect import flow
@flow
def my_flow(name: str, count: int = 1):
print(f"Hello {name}, {count} times!")
# You can run it directly like this
if __name__ == "__main__":
my_flow("Alice", count=3) # Pass parameters just like a regular function
You can pass parameters in several ways:
1. Direct function call (as shown above):
python
my_flow("Alice", count=3)
2. Using kwargs dictionary:
python
parameters = {"name": "Alice", "count": 3}
my_flow(**parameters)
3. Mix of positional and keyword arguments:
python
my_flow("Alice", count=3)
# or
my_flow(name="Alice", count=3)
The if __name__ == "__main__":
pattern is particularly useful because it:
- Allows you to import the flow in other files without automatically executing it
- Provides a convenient way to test your flows during development
- Follows Python best practices for script organization
When you deploy your flow, you can still specify default parameters or override them at runtime through the Prefect UI or API.
Here's a more complete example:
python
from prefect import flow
@flow
def process_data(dataset_name: str, batch_size: int = 100, debug: bool = False):
print(f"Processing {dataset_name} in batches of {batch_size}")
# Your processing logic here
return f"Processed {dataset_name}"
if __name__ == "__main__":
# For local testing/development
result = process_data(
dataset_name="sales_data",
batch_size=50,
debug=True
)
print(result)
Remember that any parameters you pass to your flow will be automatically tracked in Prefect's UI, making it easy to monitor and debug your flow runs.Fernando Müller
07/11/2025, 1:06 AMMarvin
07/11/2025, 1:06 AMMarvin
07/11/2025, 1:07 AMpython
@flow
def my_flow(name: str, count: int = 1):
print(f"Hello {name}, {count} times!")
# Run directly with parameters
my_flow("Alice", count=3)
2. Deployment Default Parameters:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_flow(name: str, count: int = 1):
print(f"Hello {name}, {count} times!")
# Set default parameters in the deployment
deployment = flow.deploy(
name="my-deployment",
parameters={"name": "Bob", "count": 5}
)
3. Schedule-Specific Parameters:
python
from datetime import timedelta
# Different parameters for different schedule times
deployment = flow.deploy(
name="my-deployment",
parameters={"name": "Default", "count": 1}, # default parameters
schedule={"interval": timedelta(hours=1)},
schedule_parameters=[
# Override parameters for specific schedule
{"parameters": {"name": "Morning Run", "count": 3}}
]
)
4. Run-time Parameter Override (via UI or API):
You can override parameters when starting a flow run, regardless of what's set in the deployment or schedule.
Here's a complete example showing multiple approaches:
python
from prefect import flow
from datetime import timedelta
@flow
def process_data(dataset_name: str, batch_size: int = 100):
print(f"Processing {dataset_name} in batches of {batch_size}")
return f"Processed {dataset_name}"
# For direct execution
if __name__ == "__main__":
# Run directly with parameters
process_data("test_data", batch_size=50)
# Create deployment with default parameters
deployment = process_data.deploy(
name="data-processor",
parameters={"dataset_name": "default_dataset", "batch_size": 200},
schedule={"interval": timedelta(days=1)},
schedule_parameters=[
{
"parameters": {
"dataset_name": "daily_dataset",
"batch_size": 300
}
}
]
)
The parameter precedence (from highest to lowest priority) is:
1. Parameters specified at run-time (via UI or API)
2. Schedule-specific parameters
3. Deployment default parameters
4. Flow default parameters (defined in the function signature)
This flexibility allows you to:
- Test flows locally with different parameters
- Set sensible defaults in your deployment
- Override parameters for specific scheduled runs
- Change parameters on-the-fly through the Prefect UI or APIFernando Müller
07/11/2025, 1:08 AMMarvin
07/11/2025, 1:08 AMMarvin
07/11/2025, 1:08 AMpython
from prefect import flow
@flow
def process_data(dataset_name: str, batch_size: int = 100):
print(f"Processing {dataset_name} in batches of {batch_size}")
return f"Processed {dataset_name}"
# Create a deployment without any default parameters
deployment = process_data.deploy(
name="ui-parameters-demo",
)
if __name__ == "__main__":
deployment.apply()
After you run this script, you can go to the Prefect UI and:
1. Navigate to the "Deployments" page
2. Find your "ui-parameters-demo" deployment
3. Click "Run"
4. You'll see a form where you can input values for dataset_name
and batch_size
5. Fill in the values you want
6. Click "Run deployment"
The UI will show input fields for both parameters:
- dataset_name
will be required (because it has no default value)
- batch_size
will be optional and pre-filled with 100 (because that's the default in the function definition)
You can make an even simpler version if you don't need the if __name__ == "__main__":
block:
python
from prefect import flow
@flow
def process_data(dataset_name: str, batch_size: int = 100):
print(f"Processing {dataset_name} in batches of {batch_size}")
return f"Processed {dataset_name}"
# Just deploy the flow
process_data.deploy(name="ui-parameters-demo")
This approach gives you maximum flexibility to:
- Set different parameter values for each run
- Experiment with different inputs
- Run the same flow with different parameters without changing the code
The UI will enforce the type hints you've defined (str
for dataset_name and int
for batch_size), helping prevent errors from invalid input types.Fernando Müller
07/11/2025, 2:03 PMMarvin
07/11/2025, 2:03 PMMarvin
07/11/2025, 2:03 PMpython
from prefect import flow
@flow
def process_data(dataset_name: str, batch_size: int = 100):
print(f"Processing {dataset_name} in batches of {batch_size}")
return f"Processed {dataset_name}"
if __name__ == "__main__":
# Method 1: Pass parameters directly
process_data("my_dataset", batch_size=50)
# Method 2: Using a dictionary of parameters
parameters = {
"dataset_name": "another_dataset",
"batch_size": 75
}
process_data(**parameters)
# Method 3: Mix and match positional and keyword arguments
process_data("third_dataset", batch_size=200)
You can run this script directly with python your_script.py
and it will execute the flow with your specified parameters.
Here's a more practical example that shows different ways to run it locally:
python
from prefect import flow
@flow
def process_data(dataset_name: str, batch_size: int = 100, debug: bool = False):
print(f"Processing {dataset_name} in batches of {batch_size}")
if debug:
print("Debug mode enabled")
return f"Processed {dataset_name}"
if __name__ == "__main__":
# Run with minimal parameters (using default batch_size)
result1 = process_data("small_dataset")
print(result1)
# Run with all parameters specified
result2 = process_data(
dataset_name="large_dataset",
batch_size=500,
debug=True
)
print(result2)
# Run with parameters from a configuration
test_config = {
"dataset_name": "test_dataset",
"batch_size": 10,
"debug": True
}
result3 = process_data(**test_config)
print(result3)
The flow will run in your local Prefect environment, and you'll see:
- The print statements in your console
- The flow run appearing in your local Prefect UI
- All parameters being tracked and visible in the UI
This is particularly useful for:
- Testing your flows during development
- Debugging with different parameter combinations
- Running quick experiments locally before deploying
Remember, when running locally, the flow will still be tracked by Prefect, so you can monitor its execution and parameters in the UI, just like with deployed flows.Fernando Müller
07/11/2025, 11:11 PMMarvin
07/11/2025, 11:11 PMMarvin
07/11/2025, 11:12 PMpython
from prefect import serve
# Create a process work pool
serve.create_work_pool(
name="my-process-pool",
work_pool_type="process"
)
# Start the worker
serve.start(work_pool_name="my-process-pool")
The Process work pool:
- Runs flows in separate Python processes
- Doesn't require any container setup
- Works right out of the box
- Perfect for local development or simple production setups
To use it with your flows, just specify it in your deployment:
python
@flow
def my_flow(param: str):
print(f"Running with {param}")
# Deploy to your process work pool
my_flow.deploy(
name="my-deployment",
work_pool_name="my-process-pool"
)
If you need something a bit more robust but still simple to maintain, consider:
1. Docker work pool - If you're already using Docker:
python
serve.create_work_pool(
name="my-docker-pool",
work_pool_type="docker"
)
2. Kubernetes work pool - If you have a Kubernetes cluster:
python
serve.create_work_pool(
name="my-k8s-pool",
work_pool_type="kubernetes"
)
The Process work pool is the simplest to maintain because:
- No additional infrastructure required
- No container management
- Easy to debug (runs locally)
- Simple to monitor
- Quick to set up and tear down
Just remember to keep the worker running (the serve.start()
command) for your flows to execute. You can run this as a service or in a separate terminal window during development.Fernando Müller
07/11/2025, 11:14 PMMarvin
07/11/2025, 11:15 PMMarvin
07/11/2025, 11:16 PMpython
from prefect import serve
# On your EC2 instance
serve.create_work_pool(
name="ec2-process-pool",
work_pool_type="process"
)
# Start the worker
serve.start(work_pool_name="ec2-process-pool")
Docker Work Pool on EC2
Pros:
- Complete isolation between flow runs
- Reproducible environments through Dockerfiles
- Can run different flows with different Python versions/dependencies
- Easier to version control entire runtime environments
- Better security through container isolation
- Can test containers locally before deploying
Cons:
- Additional overhead from Docker runtime
- More complex setup (needs Docker installed and configured)
- Slightly slower start-up times for flow runs
- More disk space needed for images
- Need to manage Docker image builds and registry
Example Setup for Docker Pool on EC2:
python
from prefect import serve
# On your EC2 instance
serve.create_work_pool(
name="ec2-docker-pool",
work_pool_type="docker",
base_job_template={
"image": "your-registry/your-image:tag"
}
)
# Start the worker
serve.start(work_pool_name="ec2-docker-pool")
Recommendations:
Use Process Work Pool if:
- You have a small number of flows with compatible dependencies
- You want the simplest possible setup
- Quick startup time is crucial
- You're comfortable managing dependencies at the system level
- Resource efficiency is a priority
Use Docker Work Pool if:
- You have multiple flows with different dependency requirements
- You need reproducible environments
- Security isolation is important
- You're already using Docker in your infrastructure
- You want better environment version control
Real-world Example:
Let's say you have two flows with different Python version requirements:
python
# Flow 1 needs Python 3.8 and pandas 1.5
@flow
def process_legacy_data(data_path: str):
# Uses older pandas features
pass
# Flow 2 needs Python 3.11 and pandas 2.0
@flow
def process_new_data(data_path: str):
# Uses newer pandas features
pass
With a Process work pool, you'd have to find a compromise that works for both flows or maintain separate EC2 instances.
With a Docker work pool, you could have different Dockerfiles:
dockerfile
# Dockerfile.legacy
FROM python:3.8-slim
RUN pip install pandas==1.5.0 prefect
# Dockerfile.new
FROM python:3.11-slim
RUN pip install pandas==2.0.0 prefect
And specify different images in your deployments:
python
process_legacy_data.deploy(
name="legacy-deployment",
work_pool_name="ec2-docker-pool",
image="your-registry/legacy-flow:latest"
)
process_new_data.deploy(
name="new-deployment",
work_pool_name="ec2-docker-pool",
image="your-registry/new-flow:latest"
)
For an EC2 setup, I generally recommend starting with a Process work pool if you have simple requirements and consistent dependencies. You can always migrate to Docker later if you need more isolation or dependency flexibility.Fernando Müller
07/12/2025, 1:05 PMMarvin
07/12/2025, 1:05 PMMarvin
07/12/2025, 1:06 PMpython
from prefect import flow
from prefect.filesystems import GitLab
# First, deploy your flow with GitLab as the source
@flow
def my_flow(param: str):
print(f"Running with {param}")
# Deploy specifying the GitLab source
deployment = my_flow.from_source(
source="<https://gitlab.com/your-username/your-repo>",
entrypoint="path/to/your/flow.py:my_flow" # relative path in repo
).deploy(
name="gitlab-flow",
work_pool_name="docker-gitlab-pool"
)
Then create your Docker work pool:
python
from prefect import serve
serve.create_work_pool(
name="docker-gitlab-pool",
work_pool_type="docker",
base_job_template={
"image": "prefecthq/prefect:2-python3.10", # or your custom image
"pull_policy": "always"
}
)
A few important points:
1. Authentication: If your GitLab repo is private, you'll need to set up authentication:
python
from prefect import serve
# Create GitLab block for authentication
gitlab_block = GitLab(
name="my-gitlab",
repository="<https://gitlab.com/your-username/your-repo>",
reference="main", # or specific branch/tag
access_token="your-gitlab-token" # store securely!
)
gitlab_block.save()
# Use in deployment
deployment = my_flow.from_source(
source=gitlab_block,
entrypoint="path/to/your/flow.py:my_flow"
).deploy(
name="gitlab-flow",
work_pool_name="docker-gitlab-pool"
)
2. Custom Docker Image: If your flows need specific dependencies, create a custom Dockerfile:
dockerfile
FROM prefecthq/prefect:2-python3.10
COPY requirements.txt .
RUN pip install -r requirements.txt
3. Complete Example with Dependencies:
python
from prefect import flow, serve
from prefect.filesystems import GitLab
# Create GitLab block
gitlab_block = GitLab(
name="my-gitlab",
repository="<https://gitlab.com/your-username/your-repo>",
reference="main",
)
gitlab_block.save()
# Create Docker work pool
serve.create_work_pool(
name="docker-gitlab-pool",
work_pool_type="docker",
base_job_template={
"image": "your-registry/your-custom-image:latest",
"pull_policy": "always"
}
)
# Deploy your flow
@flow
def my_flow(param: str):
print(f"Running with {param}")
deployment = my_flow.from_source(
source=gitlab_block,
entrypoint="flows/my_flow.py:my_flow"
).deploy(
name="gitlab-flow",
work_pool_name="docker-gitlab-pool"
)
The advantages of this setup are:
- Code is version controlled in GitLab
- Each flow run pulls the latest code automatically
- Environment is reproducible through Docker
- Can use different branches for development/staging/production
- Easy to manage dependencies through Docker images
Some tips for maintenance:
1. Use specific tags for your Docker images rather than 'latest'
2. Store sensitive information (like GitLab tokens) in Prefect blocks
3. Consider using GitLab CI/CD to automatically build and push your Docker images
4. Use branch protection rules in GitLab to ensure code quality
To start the worker:
bash
prefect worker start -p docker-gitlab-pool
This setup gives you a robust, maintainable solution that keeps your code in GitLab while providing the isolation and reproducibility benefits of Docker.Fernando Müller
07/12/2025, 1:08 PMMarvin
07/12/2025, 1:08 PMMarvin
07/12/2025, 1:10 PMbash
# Install Docker on Ubuntu
sudo apt-get update
sudo apt-get install <http://docker.io|docker.io>
sudo systemctl enable docker
sudo systemctl start docker
# Add your user to docker group
sudo usermod -aG docker $USER
# Basic security setup
sudo ufw allow ssh
sudo ufw enable
Pros:
- Simple to set up and maintain
- Direct control over the instance
- Easy to monitor and debug
- Good for small to medium workloads
Cons:
- Single point of failure
- Manual scaling
- Need to manage updates yourself
2. Amazon ECS (Elastic Container Service)
- Managed container service
- No need to manage the Docker daemon yourself
- AWS handles scaling and availability
Pros:
- Fully managed service
- Built-in scaling and high availability
- Integration with other AWS services
- Better security controls
Cons:
- More expensive
- More complex setup
- AWS-specific
3. Docker Swarm (Self-managed cluster)
bash
# On manager node
docker swarm init
# On worker nodes (using the token from manager)
docker swarm join --token <TOKEN> <MANAGER-IP>:2377
Pros:
- Built into Docker
- Simpler than Kubernetes
- Good for medium-scale deployments
- Platform-independent
Cons:
- Less feature-rich than Kubernetes
- Manual cluster management
Recommendation for Most Cases: EC2 with Docker
Here's a complete setup script for an EC2 instance:
bash
#!/bin/bash
# Update system
sudo apt-get update
sudo apt-get upgrade -y
# Install Docker
sudo apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
software-properties-common
# Add Docker's official GPG key
curl -fsSL <https://download.docker.com/linux/ubuntu/gpg> | sudo apt-key add -
# Add Docker repository
sudo add-apt-repository \
"deb [arch=amd64] <https://download.docker.com/linux/ubuntu> \
$(lsb_release -cs) \
stable"
# Install Docker CE
sudo apt-get update
sudo apt-get install -y docker-ce
# Start Docker service
sudo systemctl start docker
sudo systemctl enable docker
# Add user to Docker group
sudo usermod -aG docker $USER
# Basic security
sudo apt-get install -y ufw
sudo ufw default deny incoming
sudo ufw default allow outgoing
sudo ufw allow ssh
sudo ufw allow 2375/tcp # Docker API (only if needed)
sudo ufw enable
# Install Docker Compose
sudo curl -L "<https://github.com/docker/compose/releases/download/v2.20.0/docker-compose-$>(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
# Create Docker daemon configuration
sudo tee /etc/docker/daemon.json <<EOF
{
"log-driver": "json-file",
"log-opts": {
"max-size": "10m",
"max-file": "3"
},
"default-ulimits": {
"nofile": {
"Name": "nofile",
"Hard": 64000,
"Soft": 64000
}
}
}
EOF
# Restart Docker to apply changes
sudo systemctl restart docker
Important Security Considerations:
1. Network Security
bash
# Only allow necessary ports
sudo ufw allow ssh
sudo ufw allow https
# If you need Docker API access (be careful with this):
sudo ufw allow from YOUR_IP to any port 2375
2. Resource Limits
bash
# In /etc/docker/daemon.json
{
"default-ulimits": {
"nofile": {
"Name": "nofile",
"Hard": 64000,
"Soft": 64000
}
},
"storage-driver": "overlay2",
"log-driver": "json-file",
"log-opts": {
"max-size": "10m",
"max-file": "3"
}
}
3. Monitoring Setup
bash
# Install basic monitoring
sudo apt-get install -y htop ctop
# For Docker stats
docker stats
Integration with Prefect:
After setting up your Docker daemon, configure your Prefect work pool:
```python
from prefect import serve
serve.create_work_pool(
name="my-docker-pool",
work_pool_type="docker",
base_job_template={
"image": "prefecthq/prefect:2-python3.10",
"network_mode": "host", # if neededMarvin
07/12/2025, 1:10 PMStart your worker:
```bash
prefect worker start -p my-docker-pool
This setup provides a good balance of simplicity, maintainability, and scalability for most use cases. As your needs grow, you can consider migrating to ECS or setting up a Docker Swarm cluster.Fernando Müller
07/13/2025, 12:11 AMMarvin
07/13/2025, 12:11 AMMarvin
07/13/2025, 12:13 AMpython
from prefect import serve
serve.create_work_pool(
name="ecs-fargate-pool",
work_pool_type="ecs",
base_job_template={
"env": {"PREFECT_LOGGING_LEVEL": "INFO"},
"cpu": "256",
"memory": "512",
"launch_type": "FARGATE", # Specify Fargate
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": ["sg-your-security-group"],
"subnets": ["subnet-your-subnet"]
}
}
}
)
2. Deploy your flow specifying the Fargate work pool:
python
from prefect import flow
@flow
def my_flow(param: str):
print(f"Processing {param}")
# Deploy to the Fargate work pool
deployment = my_flow.deploy(
name="fargate-deployment",
work_pool_name="ecs-fargate-pool",
image="your-image:tag" # Your Docker image
)
3. Start the worker:
bash
prefect worker start -p ecs-fargate-pool
Complete Example with GitLab Integration:
python
from prefect import flow, serve
from prefect.filesystems import GitLab
# Set up GitLab block for source code
gitlab_block = GitLab(
name="my-gitlab",
repository="<https://gitlab.com/your-username/your-repo>",
reference="main"
)
gitlab_block.save()
# Create ECS Fargate work pool
serve.create_work_pool(
name="ecs-fargate-pool",
work_pool_type="ecs",
base_job_template={
# Basic Fargate configuration
"launch_type": "FARGATE",
"cpu": "256",
"memory": "512",
# Networking (replace with your values)
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": ["sg-xxxxx"],
"subnets": ["subnet-xxxxx"]
}
},
# Environment configuration
"env": {
"PREFECT_LOGGING_LEVEL": "INFO",
"AWS_DEFAULT_REGION": "your-region"
},
# Task execution role
"task_role_arn": "arn:aws:iam::your-account:role/your-task-role",
"execution_role_arn": "arn:aws:iam::your-account:role/your-execution-role"
}
)
# Deploy flow with GitLab source
@flow
def my_flow(param: str):
print(f"Processing {param}")
deployment = my_flow.from_source(
source=gitlab_block,
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="fargate-deployment",
work_pool_name="ecs-fargate-pool",
image="your-registry/your-image:tag"
)
Advantages of Using Fargate:
- No EC2 instances to manage
- Pay only for resources used during task execution
- Automatic scaling
- Built-in security and isolation
- AWS manages the underlying infrastructure
Important Considerations:
1. IAM Roles
You'll need two IAM roles:
- Task Execution Role (for pulling images, logging)
- Task Role (for your flow's AWS permissions)
2. Networking
python
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED", # or "DISABLED" if using NAT Gateway
"securityGroups": ["sg-xxxxx"],
"subnets": ["subnet-xxxxx"]
}
}
3. Resource Configuration
python
"cpu": "256", # 0.25 vCPU
"memory": "512", # 512 MB RAM
Common Fargate configurations:
- 0.25 vCPU (256) / 512MB
- 0.5 vCPU (512) / 1GB
- 1 vCPU (1024) / 2GB
- 2 vCPU (2048) / 4GB
4. Cost Optimization
python
# Add tags for cost tracking
"tags": [
{"key": "Project", "value": "MyPrefectFlows"},
{"key": "Environment", "value": "Production"}
]
5. Logging
```python
"logging": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/prefect/flows",
"awslogs-region": "your-region",Marvin
07/13/2025, 12:13 AMFernando Müller
07/13/2025, 1:46 AMMarvin
07/13/2025, 1:46 AMMarvin
07/13/2025, 1:47 AMyaml
name: ecs-worker-guide
prefect-version: 2.14.20
- `name`: The project name
- `prefect-version`: Specifies which Prefect version this configuration is for
2. Build Section
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: <my-ecr-repo>
tag: latest
dockerfile: auto
- Defines how to build your Docker image
- `id: build_image`: Creates a reference ID you can use later
- `requires`: Specifies the required Prefect Docker package
- `image_name`: Your ECR repository URL
- `tag`: Docker image tag
- `dockerfile: auto`: Automatically detects the Dockerfile
3. Push Section
yaml
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
- Configures how to push the Docker image to your registry
- Uses variables from the build step (note the {{ }}
syntax)
- Pushes to the same repository specified in the build step
4. Deployments Section
yaml
deployments:
- name: my_ecs_deployment
version:
tags: []
description:
entrypoint: flow.py:my_flow
parameters: {}
work_pool:
name: ecs-dev-pool
work_queue_name:
job_variables:
image: '{{ build_image.image }}'
schedules: []
- Defines how your flow should be deployed
- `name`: Name of your deployment
- `entrypoint`: Points to your flow file and function
- `work_pool`: Specifies the ECS work pool configuration
- `image`: References the Docker image from the build step
5. Pull Section
yaml
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/ecs-worker-guide
- Configures where code should be pulled and executed
- Sets the working directory in the container
Here's how you might extend this for a more complete configuration:
yaml
name: ecs-worker-guide
prefect-version: 2.14.20
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: <my-ecr-repo>
tag: latest
dockerfile: auto
# Add build arguments if needed
build_args:
ENV: production
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
# Add AWS authentication if needed
aws_credentials: '{{ prefect.blocks.aws-credentials.production }}'
deployments:
- name: my_ecs_deployment
version: '1.0'
tags: ['production', 'ecs']
description: 'ECS deployment for my flow'
entrypoint: flow.py:my_flow
parameters:
batch_size: 100
timeout: 3600
work_pool:
name: ecs-dev-pool
work_queue_name: high-priority
job_variables:
cpu: '256'
memory: '512'
task_role_arn: 'arn:aws:iam::123456789012:role/my-task-role'
image: '{{ build_image.image }}'
schedules:
- cron: '0 0 * * *' # Daily at midnight
- interval: 3600 # Every hour
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/ecs-worker-guide
Key Things to Note:
1. The {{ }}
syntax is for variable interpolation
2. The build_image.image
reference allows you to maintain consistency between build and deployment
3. You can add multiple deployments under the deployments
section
4. The work_pool
section can include ECS-specific configurations
5. The schedules
section can include both cron and interval-based schedules
To use this file:
bash
# Deploy your flow
prefect deploy
# Start your worker
prefect worker start -p ecs-dev-pool
This configuration provides a complete pipeline from building your Docker image to deploying it on ECS Fargate, with proper versioning and scheduling.Fernando Müller
07/13/2025, 1:54 AMMarvin
07/13/2025, 1:54 AMMarvin
07/13/2025, 1:56 AM{aws-account-id}.dkr.ecr.{region}.<http://amazonaws.com/{repository-name}|amazonaws.com/{repository-name}>
For example:
yaml
image_name: <http://123456789012.dkr.ecr.us-east-1.amazonaws.com/my-prefect-flows|123456789012.dkr.ecr.us-east-1.amazonaws.com/my-prefect-flows>
Here's how to set it up:
1. Create an ECR Repository
bash
# Create the repository
aws ecr create-repository --repository-name my-prefect-flows
# Get login credentials for Docker
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <http://123456789012.dkr.ecr.us-east-1.amazonaws.com|123456789012.dkr.ecr.us-east-1.amazonaws.com>
2. Update your Prefect YAML
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: <http://123456789012.dkr.ecr.us-east-1.amazonaws.com/my-prefect-flows|123456789012.dkr.ecr.us-east-1.amazonaws.com/my-prefect-flows>
tag: latest
dockerfile: auto
3. Using Variables
You can make it more flexible using environment variables:
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: ${ECR_REGISTRY}/my-prefect-flows
tag: ${IMAGE_TAG:-latest}
dockerfile: auto
Then set your environment variables:
bash
export ECR_REGISTRY=<http://123456789012.dkr.ecr.us-east-1.amazonaws.com|123456789012.dkr.ecr.us-east-1.amazonaws.com>
export IMAGE_TAG=v1.0.0
Complete Example with ECR Integration:
yaml
name: ecs-worker-guide
prefect-version: 2.14.20
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: ${ECR_REGISTRY}/my-prefect-flows
tag: ${IMAGE_TAG:-latest}
dockerfile: auto
build_args:
ENV: production
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
aws_credentials: '{{ prefect.blocks.aws-credentials.production }}'
deployments:
- name: my_ecs_deployment
entrypoint: flow.py:my_flow
work_pool:
name: ecs-dev-pool
job_variables:
cpu: '256'
memory: '512'
# Important: ECS needs permission to pull from ECR
task_role_arn: 'arn:aws:iam::123456789012:role/ecs-task-role'
execution_role_arn: 'arn:aws:iam::123456789012:role/ecs-execution-role'
image: '{{ build_image.image }}'
Required IAM Permissions:
1. ECS Task Execution Role needs:
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage"
],
"Resource": "*"
}
]
}
2. For pushing images, you need:
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:GetRepositoryPolicy",
"ecr:DescribeRepositories",
"ecr:ListImages",
"ecr:DescribeImages",
"ecr:BatchGetImage",
"ecr:InitiateLayerUpload",
"ecr:UploadLayerPart",
"ecr:CompleteLayerUpload",
"ecr:PutImage"
],
"Resource": "arn:aws:ecr:region:account-id:repository/my-prefect-flows"
}
]
}
Best Practices:
1. Use Specific Tags: Avoid using latest
in production
yaml
tag: ${GIT_SHA:-latest} # Use Git commit SHA as tag
2. Use Image Scanning:
bash
aws ecr put-image-scanning-configuration \
--repository-name my-prefect-flows \
--image-scanning-configuration scanOnPush=true
3. Set up Repository Policy:
```bash
aws ecr set-repository-policy \Marvin
07/13/2025, 1:56 AM4. *Use Image Lifecycle Policies* to manage old images:
```bash
aws ecr put-lifecycle-policy \
--repository-name my-prefect-flows \
--lifecycle-policy-text <file://lifecycle-policy.json>
This setup provides a secure and maintainable way to manage your Prefect flow images in ECR, with proper access controls and lifecycle management.Fernando Müller
07/13/2025, 1:57 AMMarvin
07/13/2025, 1:57 AMMarvin
07/13/2025, 1:58 AMrequirements.txt
in your project directory:
txt
pandas==2.1.0
prefect>=2.14.20
# other dependencies
Then in your `Dockerfile`:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Copy requirements.txt first for better caching
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy the rest of your code
COPY . .
2. Directly in Dockerfile:
dockerfile
FROM prefecthq/prefect:2-python3.10
RUN pip install pandas==2.1.0 \
other-dependency==1.0.0 \
another-package==2.0.0
COPY . .
3. Using Poetry (if you prefer poetry for dependency management):
First, have a `pyproject.toml`:
toml
[tool.poetry]
name = "your-project"
version = "0.1.0"
description = ""
[tool.poetry.dependencies]
python = "^3.10"
pandas = "^2.1.0"
prefect = "^2.14.20"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
Then in your `Dockerfile`:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Install poetry
RUN pip install poetry
# Copy poetry files
COPY pyproject.toml poetry.lock ./
# Install dependencies
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi
# Copy your code
COPY . .
Here's a complete example of how your project structure might look:
your-project/
├── Dockerfile
├── requirements.txt
├── prefect.yaml
├── flows/
│ └── your_flow.py
└── .prefectignore
requirements.txt:
txt
pandas==2.1.0
prefect>=2.14.20
numpy>=1.24.0
scikit-learn>=1.3.0
Dockerfile:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Install system dependencies if needed
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better layer caching
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy the rest of your code
COPY . .
# Set working directory
WORKDIR /opt/prefect/flows
prefect.yaml:
yaml
name: ecs-worker-guide
prefect-version: 2.14.20
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: your-ecr-repo/prefect-flows
tag: latest
dockerfile: Dockerfile # Explicitly specify the Dockerfile
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: '{{ build_image.image_name }}'
tag: '{{ build_image.tag }}'
deployments:
- name: my_ecs_deployment
entrypoint: flows/your_flow.py:my_flow
work_pool:
name: ecs-dev-pool
job_variables:
cpu: '256'
memory: '512'
image: '{{ build_image.image }}'
flows/your_flow.py:
python
from prefect import flow, task
import pandas as pd
@task
def process_data(df: pd.DataFrame):
# Your pandas operations here
return df.describe()
@flow
def my_flow(input_data: str):
df = pd.read_csv(input_data)
results = process_data(df)
return results
.prefectignore:
# Ignore files not needed in the Docker image
.git
.gitignore
.env
__pycache__
*.pyc
.pytest_cache
tests/
Best Practices:
1. Pin Your Dependencies:
txt
# requirements.txt
pandas==2.1.0 # Exact version
prefect==2.14.20 # Exact version
2. Use Multi-stage Builds for smaller images:
dockerfile
# Build stage
FROM prefecthq/prefect:2-python3.10 as builder
COPY requirements.txt .
RUN pip install --user -r requirements.txt
# Runtime stage
FROM prefecthq/prefect:2-python3.10
# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local
COPY . .
# Ensure scripts in .local are usable
ENV PATH=/root/.local/bin:$PATH
3. Group Related Dependencies:
```txt
# requirements.txt
# Core dependencies
prefect==2.14.20
pandas==2.1.0
# Data processing
numpy==1.24.0Marvin
07/13/2025, 1:58 AM4. *Consider Memory Requirements*:
If you're using pandas, you might want to adjust your ECS task memory:
```yaml
work_pool:
job_variables:
cpu: '1024' # 1 vCPU
memory: '2048' # 2GB RAM - good for pandas operations
This setup gives you a complete environment with all your dependencies properly installed and available to your flows when they run in ECS Fargate.Fernando Müller
07/13/2025, 2:46 AMMarvin
07/13/2025, 2:46 AMMarvin
07/13/2025, 2:47 AMyaml
work_pool:
job_variables:
cpu: '1024' # Try increasing (e.g., 1024 = 1 vCPU)
memory: '2048' # Try increasing (e.g., 2048 = 2GB)
2. Container Configuration Issues:
- Check your Dockerfile is properly configured
- Ensure all dependencies are correctly installed
- Verify the entrypoint is correct
3. Networking Issues:
- Check security groups
- Verify subnet configuration
- Ensure VPC endpoints are set up if needed
Here's a troubleshooting checklist:
1. Check CloudWatch Logs:
bash
# Get the task ID from the Prefect UI or AWS Console
aws logs get-log-events \
--log-group-name /ecs/your-task-definition \
--log-stream-name prefix/container-name/task-id
2. Verify Task Definition:
yaml
work_pool:
job_variables:
# Add logging configuration
logging:
logDriver: "awslogs"
options:
awslogs-group: "/ecs/prefect-flows"
awslogs-region: "your-region"
awslogs-stream-prefix: "prefect"
3. Check IAM Permissions:
Ensure your task has the necessary permissions:
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
4. Debug Mode Configuration:
yaml
deployments:
- name: my_ecs_deployment
work_pool:
job_variables:
env:
PREFECT_LOGGING_LEVEL: "DEBUG"
PYTHONUNBUFFERED: "1"
5. Check Container Health:
Add a healthcheck to your Dockerfile:
dockerfile
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('<http://localhost:4200>')"
Common Solutions:
1. Memory Issues:
yaml
work_pool:
job_variables:
memory: '4096' # Increase memory
memory_reservation: '2048' # Set soft limit
2. Networking Issues:
yaml
work_pool:
job_variables:
network_configuration:
awsvpcConfiguration:
assignPublicIp: "ENABLED"
securityGroups: ["sg-xxxxx"]
subnets: ["subnet-xxxxx"]
3. Container Configuration:
dockerfile
# Add error handling in your Dockerfile
ENTRYPOINT ["python", "-u", "your_script.py"]
CMD ["--debug"]
4. Environment Variables:
yaml
work_pool:
job_variables:
env:
AWS_DEFAULT_REGION: "your-region"
PYTHONUNBUFFERED: "1"
PREFECT_API_URL: "<http://your-prefect-api>"
To get more detailed error information:
1. Enable detailed monitoring:
yaml
work_pool:
job_variables:
logging:
logDriver: "awslogs"
options:
awslogs-group: "/ecs/prefect-flows"
awslogs-region: "your-region"
awslogs-stream-prefix: "prefect"
awslogs-datetime-format: "%Y-%m-%d %H:%M:%S"
2. Add error handling in your flow:
python
from prefect import flow
import sys
@flow
def my_flow():
try:
# Your flow logic here
pass
except Exception as e:
print(f"Error occurred: {e}", file=sys.stderr)
raise
If you're still seeing the error after trying these solutions, check the CloudWatch logs for the specific task that failed - they should contain more detailed information about what caused the container to exit.Fernando Müller
07/13/2025, 2:20 PMMarvin
07/13/2025, 2:20 PMMarvin
07/13/2025, 2:21 PMfatal: not a git repository (or any of the parent directories): .git
indicates that Prefect is trying to pull your code from a Git repository, but the Git configuration isn't properly set up in your container.
Let's fix this. You have two options:
1. Option 1: If you're using GitLab as your source, update your deployment configuration:
python
from prefect import flow
from prefect.filesystems import GitLab
# Create and save GitLab block if you haven't already
gitlab_block = GitLab(
name="my-gitlab",
repository="<https://gitlab.com/your-username/your-repo>",
reference="main", # or your branch name
access_token="your-gitlab-token" # if private repository
)
gitlab_block.save()
# Deploy using GitLab source
@flow
def my_flow():
pass
deployment = my_flow.from_source(
source=gitlab_block,
entrypoint="path/to/your/flow.py:my_flow"
).deploy(
name="ecs-deployment",
work_pool_name="your-ecs-pool"
)
2. Option 2: If you want to package your code with the Docker image instead:
dockerfile
# Dockerfile
FROM prefecthq/prefect:2-python3.10
# Copy your code into the image
COPY . /opt/prefect/flows
WORKDIR /opt/prefect/flows
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
Then update your deployment:
yaml
deployments:
- name: my_ecs_deployment
entrypoint: "flow.py:my_flow" # relative to /opt/prefect/flows
work_pool:
name: ecs-pool
job_variables:
image: "your-ecr-repo/your-image:tag"
3. Option 3: If you're using the Prefect YAML file:
yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: your-ecr-repo/image
tag: latest
dockerfile: auto
deployments:
- name: my_ecs_deployment
entrypoint: flow.py:my_flow
work_pool:
name: ecs-pool
job_variables:
image: '{{ build_image.image }}'
pull:
- prefect.deployments.steps.git_clone:
repository: "<https://gitlab.com/your-username/your-repo>"
branch: "main"
access_token: "{{ prefect.blocks.secret.gitlab-token }}"
If you're using GitLab, make sure to:
1. Set up GitLab authentication:
bash
# Create a GitLab access token block
prefect block register -m prefect.blocks.system
prefect block create secret --name gitlab-token --value your-token
2. Update your work pool configuration to include necessary environment variables:
yaml
work_pool:
job_variables:
env:
PREFECT_GITLAB_ACCESS_TOKEN: "{{ prefect.blocks.secret.gitlab-token }}"
3. Ensure your ECS task has the correct permissions:
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ecr:GetAuthorizationToken",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
Choose the option that best fits your deployment strategy:
- Option 1 (GitLab block) is good for development and when you want to pull fresh code for each run
- Option 2 (Docker packaging) is better for production where you want to version your code with your container
- Option 3 (YAML configuration) is good for declarative setups and CI/CD pipelines
Let me know which option you'd prefer, and I can provide more specific guidance!Fernando Müller
07/13/2025, 2:34 PMMarvin
07/13/2025, 2:34 PMMarvin
07/13/2025, 2:35 PMpull
configuration is being applied to all deployments. Let's restructure your prefect.yaml
to handle both types of deployments correctly. Here's how you can modify it:
yaml
name: psp-prefect
description: PSP Prefect project.
# Build and push configurations for ECS deployments
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo>
tag: latest
dockerfile: auto
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: "{{ build_image.image_name }}"
tag: "{{ build_image.tag }}"
# Remove the global pull configuration and specify it per deployment
deployments:
# Managed pool deployments with GitLab source
- name: test-flow-deployment
description: A test flow that makes an API call using requests.
entrypoint: flows/test/test_flow.py:test_flow
parameters: {}
work_pool:
name: default-work-pool
job_variables:
pip_packages:
- requests
pull:
- prefect.deployments.steps.git_clone:
repository: <https://gitlab.com/psp-gt-dev/psp-prefect.git>
credentials: "{{ prefect.blocks.gitlab-credentials.gitlab-credentials }}"
- name: powerplay-opportunities-sync-deployment
description: A flow that syncs opportunities from Powerplay to HubSpot.
entrypoint: flows/powerplay_leads_sync/sync_powerplay_opportunities_to_hubspot_flow.py:powerplay_opportunities_to_hubspot
parameters: {}
work_pool:
name: default-work-pool
job_variables:
pip_packages:
- requests
- hubspot-api-client
- pandas
- snowflake-connector-python
- prefect-snowflake
pull:
- prefect.deployments.steps.git_clone:
repository: <https://gitlab.com/psp-gt-dev/psp-prefect.git>
credentials: "{{ prefect.blocks.gitlab-credentials.gitlab-credentials }}"
# ... other managed pool deployments with the same pull configuration ...
# ECS deployment with Docker image
- name: ecs-test-deployment
description: Test flow running on ECS Fargate
entrypoint: flows/ecs-test-flow/my_test_flow.py:my_flow
parameters: {}
work_pool:
name: ecs-pool
job_variables:
image: "{{ build_image.image }}"
cpu: 512
memory: 1024
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/psp-prefect
Key changes made:
1. Removed the global pull
configuration
2. Added specific pull
configurations for each managed pool deployment
3. Kept the ECS deployment with its own pull
configuration using set_working_directory
This setup will:
- Use GitLab source for managed pool deployments
- Use the Docker image for ECS deployments
- Prevent the Git clone step from running for ECS deployments
For the ECS deployment, make sure your Dockerfile copies the code:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Copy your code into the image
COPY . /opt/prefect/psp-prefect
WORKDIR /opt/prefect/psp-prefect
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
This way:
- Managed pool deployments will pull code from GitLab at runtime
- ECS deployments will use the code baked into the Docker image
- No more Git-related errors for ECS deploymentsFernando Müller
07/13/2025, 2:52 PMMarvin
07/13/2025, 2:52 PMMarvin
07/13/2025, 2:53 PMdockerfile
# Use Prefect's official image as base
FROM prefecthq/prefect:2-python3.10
# Set working directory in container
WORKDIR /opt/prefect/psp-prefect
# Install system dependencies if needed (example)
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements first for better layer caching
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt \
# Add any specific packages needed for your flows
requests \
pandas \
snowflake-connector-python[pandas] \
prefect-snowflake \
hubspot-api-client
# Copy your project files
# This copies everything from your project directory into the container
COPY . .
# Set environment variables if needed
ENV PYTHONUNBUFFERED=1 \
PREFECT_LOGGING_LEVEL=INFO
# The container will run as the prefect user (security best practice)
USER prefect
# No need for CMD or ENTRYPOINT as Prefect will handle that
Save this as Dockerfile
in your project root. Here's what each section does:
1. Base Image: Uses Prefect's official image which includes Python and Prefect
dockerfile
FROM prefecthq/prefect:2-python3.10
2. Working Directory: Sets up where your code will live
dockerfile
WORKDIR /opt/prefect/psp-prefect
3. System Dependencies: If you need any system packages
dockerfile
RUN apt-get update && apt-get install -y \
gcc \
python3-dev \
&& rm -rf /var/lib/apt/lists/*
4. Python Dependencies: Installs your Python packages
dockerfile
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
5. Project Files: Copies your code into the container
dockerfile
COPY . .
6. Environment Setup: Sets any needed environment variables
dockerfile
ENV PYTHONUNBUFFERED=1 \
PREFECT_LOGGING_LEVEL=INFO
You can build and test this locally:
bash
# Build the image
docker build -t psp-prefect:latest .
# Test the image
docker run --rm psp-prefect:latest python -c "from prefect import flow; print('Prefect installation working')"
For your specific flows, you might want to add:
dockerfile
# Additional dependencies for your specific flows
RUN pip install --no-cache-dir \
requests \
hubspot-api-client \
pandas \
snowflake-connector-python[pandas] \
prefect-snowflake
# If you need specific Snowflake dependencies
RUN pip install --no-cache-dir \
snowflake-connector-python[pandas,secure-local-storage]
# If you need timezone data
RUN apt-get update && apt-get install -y \
tzdata \
&& rm -rf /var/lib/apt/lists/*
# If you need AWS CLI
RUN pip install --no-cache-dir \
awscli
# If you need git
RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*
And for better security and optimization:
dockerfile
# Multi-stage build for smaller final image
FROM prefecthq/prefect:2-python3.10 as builder
WORKDIR /opt/prefect/psp-prefect
COPY requirements.txt .
RUN pip install --user --no-cache-dir -r requirements.txt
FROM prefecthq/prefect:2-python3.10
# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local
WORKDIR /opt/prefect/psp-prefect
COPY . .
# Ensure scripts in .local are usable
ENV PATH=/root/.local/bin:$PATH \
PYTHONUNBUFFERED=1 \
PREFECT_LOGGING_LEVEL=INFO
# Run as non-root user
USER prefect
This will give you a secure, optimized container for your ECS deployments. Remember to:
1. Keep the image size small by removing unnecessary files
2. Use multi-stage builds when possible
3. Run as non-root user
4. Cache layers effectively by ordering operations properly
You can save this as Dockerfile
in your project root and reference it in your `prefect.yaml`:
```yaml
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1Marvin
07/13/2025, 2:53 PMFernando Müller
07/13/2025, 3:33 PMMarvin
07/13/2025, 3:33 PMMarvin
07/13/2025, 3:35 PMpython
from prefect import flow
from typing import Optional
@flow
def my_flow(param: Optional[str] = None):
print(f"Running ECS flow with param: {param}")
return "Flow completed successfully!"
if __name__ == "__main__":
# Create deployment
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": "512",
"memory": "1024",
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
},
# Optional: Add other ECS-specific configurations
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": ["your-security-group-id"],
"subnets": ["your-subnet-id"]
}
},
# Optional: Add tags
"tags": [
{"key": "Environment", "value": "test"},
{"key": "Project", "value": "PSP"}
]
},
description="Test flow running on ECS Fargate",
version="1",
tags=["ecs", "test"],
parameters={}, # Default parameters if needed
)
For a more complete example with error handling and configuration:
```python
from prefect import flow
from typing import Optional
import os
from prefect.logging import get_run_logger
@flow(log_prints=True)
def my_flow(param: Optional[str] = None):
logger = get_run_logger()
logger.info(f"Starting flow with param: {param}")
try:
# Your flow logic here
print(f"Running ECS flow with param: {param}")
return "Flow completed successfully!"
except Exception as e:
logger.error(f"Flow failed with error: {e}")
raise
def create_deployment(
image_uri: str = "148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest",
cpu: str = "512",
memory: str = "1024",
env_vars: dict = None
):
"""
Create an ECS deployment with the specified configuration.
Args:
image_uri: ECR image URI
cpu: CPU units ('256', '512', '1024', etc.)
memory: Memory in MB ('512', '1024', '2048', etc.)
env_vars: Additional environment variables
"""
# Default environment variables
default_env = {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"AWS_DEFAULT_REGION": "us-east-2"
}
# Merge with provided env vars
if env_vars:
default_env.update(env_vars)
# Create deployment
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": image_uri,
"cpu": cpu,
"memory": memory,
"env": default_env,
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": [os.getenv("ECS_SECURITY_GROUP_ID")],
"subnets": [os.getenv("ECS_SUBNET_ID")]
}
},
"tags": [
{"key": "Environment", "value": "test"},
{"key": "Project", "value": "PSP"},
{"key": "ManagedBy", "value": "Prefect"}
]
},
description="Test flow running on ECS Fargate",
version="1",
tags=["ecs", "test"],
parameters={
"param": "default_value" # Optional default parameter
}
)
return deployment
if name == "__main__":
# Set up environment variables if needed
os.environ["ECS_SECURITY_GROUP_ID"] = "your-security-group-id"
os.environ["ECS_SUBNET_ID"] = "your-subnet-id"
# Create the deployment
try:Marvin
07/13/2025, 3:35 PMSave this as something like `ecs_deployment.py`. To use it:
1. Make sure your ECS work pool is created:
```bash
prefect work-pool create "ecs-pool" --type ecs:push
2. Run the script:
bash
python ecs_deployment.py
3. Start the worker:
bash
prefect worker start -p ecs-pool
This setup provides:
- Proper error handling
- Configurable resources (CPU/memory)
- Custom environment variables
- Network configuration
- Logging
- Parameter defaults
- AWS resource tagging
You can also add schedules:
python
from datetime import timedelta
deployment = my_flow.deploy(
# ... other configurations ...
schedules=[
{
"interval": timedelta(hours=1),
"anchor_date": "2023-01-01T00:00:00Z",
"timezone": "UTC"
},
{
"cron": "0 0 * * *", # Daily at midnight
"timezone": "UTC"
}
]
)
Fernando Müller
07/13/2025, 3:36 PMMarvin
07/13/2025, 3:36 PMMarvin
07/13/2025, 3:37 PMflows/ecs-test-flow/my_test_flow.py
):
python
from prefect import flow
from typing import Optional
from prefect.logging import get_run_logger
@flow(log_prints=True)
def my_flow(param: Optional[str] = None):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Starting flow with param: {param}")
try:
# Your flow logic here
print(f"Running ECS flow with param: {param}")
return "Flow completed successfully!"
except Exception as e:
logger.error(f"Flow failed with error: {e}")
raise
if __name__ == "__main__":
# For local testing
my_flow("test_param")
2. Then, your deployment file (deployments/ecs_deployment.py
):
python
import os
from prefect.deployments import Deployment
from flows.ecs_test_flow.my_test_flow import my_flow
def create_ecs_deployment(
image_uri: str = "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
cpu: str = "512",
memory: str = "1024",
env_vars: dict = None
) -> Deployment:
"""
Create an ECS deployment with the specified configuration.
"""
# Default environment variables
default_env = {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"AWS_DEFAULT_REGION": "us-east-2"
}
# Merge with provided env vars
if env_vars:
default_env.update(env_vars)
# Create deployment
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": image_uri,
"cpu": cpu,
"memory": memory,
"env": default_env,
"network_configuration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": [os.getenv("ECS_SECURITY_GROUP_ID")],
"subnets": [os.getenv("ECS_SUBNET_ID")]
}
},
"tags": [
{"key": "Environment", "value": "test"},
{"key": "Project", "value": "PSP"},
{"key": "ManagedBy", "value": "Prefect"}
]
},
description="Test flow running on ECS Fargate",
version="1",
tags=["ecs", "test"],
parameters={
"param": "default_value"
}
)
return deployment
if __name__ == "__main__":
# Set up environment variables if needed
os.environ["ECS_SECURITY_GROUP_ID"] = "your-security-group-id"
os.environ["ECS_SUBNET_ID"] = "your-subnet-id"
# Create the deployment
try:
deployment = create_ecs_deployment(
env_vars={
"CUSTOM_VAR": "custom_value"
}
)
print(f"Deployment created successfully: {deployment.name}")
except Exception as e:
print(f"Failed to create deployment: {e}")
3. Optional: Create a configuration file (deployments/config.py
) for deployment settings:
python
from typing import Dict
ECS_DEPLOYMENT_CONFIG: Dict = {
"image_uri": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": "512",
"memory": "1024",
"default_env_vars": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"AWS_DEFAULT_REGION": "us-east-2"
},
"network_config": {
"security_groups": ["sg-xxxxx"],
"subnets": ["subnet-xxxxx"]
},
"tags": [
{"key": "Environment", "value": "test"},
{"key": "Project", "value": "PSP"}
]
}
4. Then update your deployment file to use the config:
```python
import os
from prefect.deployments import Deployment
from flows.ecs_test_flow.my_test_flow import my_flow
from deployments.config import ECS_DEPLOYMENT_CONFIG
def create_ecs_deployment(
config: dict = ECS_DEPLOYMENT_CONFIG,
env_vars: dict = None
) -> Deployment:
"""Marvin
07/13/2025, 3:37 PMYour project structure would look like this:
your-project/
├── flows/
│ └── ecs_test_flow/
│ └── my_test_flow.py
├── deployments/
│ ├── init.py
│ ├── config.py
│ └── ecs_deployment.py
├── Dockerfile
├── requirements.txt
└── prefect.yaml
To use this:
1. Run the deployment script:
```bash
python -m deployments.ecs_deployment
2. Or import and use in other scripts:
python
from deployments.ecs_deployment import create_ecs_deployment
deployment = create_ecs_deployment(
env_vars={
"CUSTOM_VAR": "custom_value"
}
)
This structure gives you:
- Clear separation of concerns
- Reusable deployment configuration
- Easy maintenance and updates
- Ability to version control deployment configurations separately
- Simple testing of flows independently of deployments
Remember to update your prefect.yaml
to remove the ECS deployment if you're moving it to Python-based deployment.Fernando Müller
07/13/2025, 10:18 PMMarvin
07/13/2025, 10:18 PMMarvin
07/13/2025, 10:18 PMprefect.yaml
has a global pull
configuration that's being applied to all deployments
2. Or your deployment is inheriting Git clone behavior from somewhere
Let's fix this step by step:
1. First, check your prefect.yaml
and remove any global pull
configuration:
Before:
yaml
# Remove this global pull section
pull:
- prefect.deployments.steps.git_clone:
repository: <https://gitlab.com/psp-gt-dev/psp-prefect.git>
credentials: "{{ prefect.blocks.gitlab-credentials.gitlab-credentials }}"
deployments:
- name: ecs-test-deployment
...
After:
yaml
deployments:
- name: ecs-test-deployment
description: Test flow running on ECS Fargate
work_pool:
name: ecs-pool
job_variables:
image: "{{ build_image.image }}"
pull:
- prefect.deployments.steps.set_working_directory:
directory: /opt/prefect/psp-prefect
2. If you're using Python-based deployment, make sure you're not including any Git-related steps:
python
from prefect.deployments import Deployment
from flows.ecs_test_flow.my_test_flow import my_flow
def create_ecs_deployment():
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": "512",
"memory": "1024",
},
# Make sure you're not including any pull steps here
)
return deployment
3. Make sure your Dockerfile correctly copies the code:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Set working directory
WORKDIR /opt/prefect/psp-prefect
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy your code into the image
COPY . .
# No need for git operations
ENV PYTHONUNBUFFERED=1
4. Verify your deployment configuration:
python
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": "512",
"memory": "1024",
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
},
},
# If you need to set a working directory, use this instead of git clone
pull_steps=[
{
"prefect.deployments.steps.set_working_directory": {
"directory": "/opt/prefect/psp-prefect"
}
}
]
)
5. Check that your work pool is correctly configured:
bash
# List your work pools
prefect work-pool ls
# Inspect your ECS work pool
prefect work-pool inspect ecs-pool
6. Make sure your deployment is using the correct work pool type:
bash
# Create a new ECS work pool if needed
prefect work-pool create "ecs-pool" --type ecs:push
The key points to remember are:
- ECS deployments should use the Docker image, not Git clone
- Remove any global pull configurations from prefect.yaml
- Make sure your code is properly copied into the Docker image
- Use set_working_directory
instead of git_clone
for ECS deployments
If you're still seeing the error, you can add debug logging to see where the Git operation is being triggered:
```python
deployment = my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest",
"cpu": "512",
"memory": "1024",Marvin
07/13/2025, 10:18 PMFernando Müller
07/13/2025, 10:27 PMMarvin
07/13/2025, 10:27 PMMarvin
07/13/2025, 10:27 PMDockerImage
in a way that might be causing issues. Let me help you adjust the deployment file to match the current best practices for ECS deployments:
python
from flows.ecs_test_flow.my_test_flow import my_flow
from prefect._versioning import VersionType
if __name__ == "__main__":
my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>", # Specify image directly here
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_API_RESPONSE_LIMIT": "100",
}
},
description="Test flow running on ECS Fargate",
tags=["ecs", "test"],
parameters={},
version=None,
version_type=VersionType.SIMPLE,
# Explicitly set the working directory
pull_steps=[
{
"prefect.deployments.steps.set_working_directory": {
"directory": "/opt/prefect/psp-prefect"
}
}
]
)
Key changes made:
1. Removed the DockerImage
class usage
2. Specified the image directly in job_variables
3. Added pull_steps
to set the working directory
Also, make sure your Dockerfile is correctly set up:
dockerfile
FROM prefecthq/prefect:2-python3.10
# Set working directory
WORKDIR /opt/prefect/psp-prefect
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy your code into the image
COPY . .
# Set environment variables
ENV PYTHONUNBUFFERED=1
If you're still seeing the Git error, you might need to check if there's a global pull configuration in your prefect.yaml
. Could you share your current prefect.yaml
file? That might help identify if there are any Git-related configurations that need to be removed.
Also, verify that your ECS work pool is correctly configured:
bash
# Check work pool type
prefect work-pool inspect ecs-pool
# If needed, create a new one
prefect work-pool create "ecs-pool" --type ecs:push
Fernando Müller
07/13/2025, 10:31 PMMarvin
07/13/2025, 10:31 PMMarvin
07/13/2025, 10:31 PMpull_steps
parameter isn't available for deploy()
2. Your prefect.yaml
looks correct - it has individual pull configurations per deployment rather than a global one
Let's modify your deployment script to match your YAML configuration. Here's the corrected version:
python
from flows.ecs_test_flow.my_test_flow import my_flow
from prefect._versioning import VersionType
if __name__ == "__main__":
my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_API_RESPONSE_LIMIT": "100",
},
# Add working directory configuration here
"working_dir": "/opt/prefect/psp-prefect"
},
description="Test flow running on ECS Fargate",
tags=["ecs", "test"],
parameters={},
version=None,
version_type=VersionType.SIMPLE
)
Also, let's verify a few things:
1. Make sure your ECS work pool is of type `ecs:push`:
bash
prefect work-pool inspect ecs-pool
If it's not, create a new one:
bash
prefect work-pool create "ecs-pool" --type ecs:push
2. Verify your Dockerfile is copying the code correctly:
dockerfile
FROM prefecthq/prefect:2-python3.10
WORKDIR /opt/prefect/psp-prefect
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy your code into the image
COPY . .
ENV PYTHONUNBUFFERED=1
3. Make sure your image is built and pushed correctly:
bash
# Build the image
docker build -t <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest> .
# Login to ECR
aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin <http://148761666380.dkr.ecr.us-east-2.amazonaws.com|148761666380.dkr.ecr.us-east-2.amazonaws.com>
# Push the image
docker push <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>
If you're still seeing the Git error, it would be helpful to see:
1. The complete error message from the logs
2. The output of prefect work-pool inspect ecs-pool
3. Whether the error occurs immediately or after some other stepsFernando Müller
07/13/2025, 10:34 PMMarvin
07/13/2025, 10:34 PMMarvin
07/13/2025, 10:34 PM'ecs'
when it should be 'ecs:push'
. This could be why you're seeing the Git-related error.
Let's fix this:
1. First, delete the existing work pool:
bash
prefect work-pool delete ecs-pool
2. Create a new work pool with the correct type:
bash
prefect work-pool create ecs-pool --type ecs:push
3. Then try your deployment again:
python
from flows.ecs_test_flow.my_test_flow import my_flow
from prefect._versioning import VersionType
if __name__ == "__main__":
my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool", # This should now use the ecs:push work pool
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_API_RESPONSE_LIMIT": "100",
}
},
description="Test flow running on ECS Fargate",
tags=["ecs", "test"],
parameters={},
version=None,
version_type=VersionType.SIMPLE
)
After creating the new work pool, verify it's set up correctly:
bash
prefect work-pool inspect ecs-pool
You should see type='ecs:push'
in the output. This type tells Prefect to use the push-based execution model, which is what you want for ECS deployments. The regular ecs
type might be trying to pull code, which is why you're seeing the Git error.Fernando Müller
07/13/2025, 11:34 PMMarvin
07/13/2025, 11:34 PMMarvin
07/13/2025, 11:35 PMdockerfile
# Use ARM-compatible base image
FROM --platform=linux/arm64 prefecthq/prefect:2-python3.10
WORKDIR /opt/prefect/psp-prefect
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
2. Build and push for ARM:
bash
# Build for ARM
docker buildx build --platform linux/arm64 \
-t <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest> \
.
# Push to ECR
docker push <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest>
3. Update your deployment script to specify ARM configuration:
python
from flows.ecs_test_flow.my_test_flow import my_flow
from prefect._versioning import VersionType
if __name__ == "__main__":
my_flow.deploy(
name="ecs-test-deployment-arm",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest>",
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_API_RESPONSE_LIMIT": "100",
},
# Specify ARM platform
"platform_version": "1.4.0", # Required for ARM
"runtime_platform": {
"cpuArchitecture": "ARM64",
"operatingSystemFamily": "LINUX"
}
},
description="Test flow running on ECS Fargate ARM",
tags=["ecs", "test", "arm"],
parameters={},
version=None,
version_type=VersionType.SIMPLE
)
4. Or in your `prefect.yaml`:
yaml
deployments:
- name: ecs-test-deployment-arm
description: Test flow running on ECS Fargate ARM
entrypoint: flows/ecs-test-flow/my_test_flow.py:my_flow
parameters: {}
work_pool:
name: ecs-pool
job_variables:
image: "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest>"
cpu: 1024
memory: 4096
platform_version: "1.4.0"
runtime_platform:
cpuArchitecture: "ARM64"
operatingSystemFamily: "LINUX"
5. Make sure your ECS work pool supports ARM:
bash
prefect work-pool create ecs-arm-pool --type ecs:push
Some additional considerations:
1. Cost Optimization: ARM instances can be more cost-effective:
python
job_variables={
# ... other configs ...
"capacity_provider_strategy": [
{
"capacityProvider": "FARGATE_SPOT",
"weight": 1
}
]
}
2. Resource Requirements: ARM might need different resource allocations:
python
job_variables={
"cpu": 1024, # 1 vCPU
"memory": 4096, # 4GB RAM
"resource_requirements": [
{
"type": "CPU",
"value": "1024"
},
{
"type": "MEMORY",
"value": "4096"
}
]
}
3. Monitoring: Add tags to track ARM vs x86 costs:
python
job_variables={
# ... other configs ...
"tags": [
{"key": "Architecture", "value": "ARM64"},
{"key": "Environment", "value": "production"}
]
}
4. Multi-architecture Support: If you need both ARM and x86:
dockerfile
# Use buildx to create multi-arch images
FROM --platform=$TARGETPLATFORM prefecthq/prefect:2-python3.10
And build:
```bash
docker buildx build --platform linux/amd64,linux/arm64 \Marvin
07/13/2025, 11:35 PMFernando Müller
07/13/2025, 11:58 PMMarvin
07/13/2025, 11:59 PMMarvin
07/13/2025, 11:59 PMpython
from flows.ecs_test_flow.my_test_flow import my_flow
from prefect._versioning import VersionType
if __name__ == "__main__":
my_flow.deploy(
name="ecs-test-deployment-arm",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest>",
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_API_RESPONSE_LIMIT": "100",
},
# Specify ARM using requiresCompatibilities and platformFamily
"requiresCompatibilities": ["FARGATE"],
"platformFamily": "ARM64",
# Optional: specify launch type
"launchType": "FARGATE"
},
description="Test flow running on ECS Fargate ARM",
tags=["ecs", "test", "arm"],
parameters={},
version=None,
version_type=VersionType.SIMPLE
)
Or in your `prefect.yaml`:
yaml
deployments:
- name: ecs-test-deployment-arm
description: Test flow running on ECS Fargate ARM
entrypoint: flows/ecs-test-flow/my_test_flow.py:my_flow
parameters: {}
work_pool:
name: ecs-pool
job_variables:
image: "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest>"
cpu: 1024
memory: 4096
requiresCompatibilities: ["FARGATE"]
platformFamily: "ARM64"
launchType: "FARGATE"
Make sure your Docker image is built for ARM:
dockerfile
# Use ARM-compatible base image
FROM --platform=linux/arm64 prefecthq/prefect:2-python3.10
And build it specifically for ARM:
bash
# Build for ARM
docker buildx build --platform linux/arm64 \
-t <http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:arm64-latest> \
.
If this still doesn't work, we might need to:
1. Create a new work pool specifically for ARM
2. Check if your AWS region supports ARM on Fargate
3. Verify the task definition is being created correctly
Could you share any error messages you're seeing? That would help diagnose the issue more precisely.Fernando Müller
07/14/2025, 12:04 AMMarvin
07/14/2025, 12:05 AMMarvin
07/14/2025, 12:05 AMfamily
variable in an ECS work pool is used to specify the name of the task definition family. It's essentially a way to group different versions of related task definitions together in ECS.
Here's how you can use it:
python
my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"family": "psp-prefect-tasks", # This groups your task definitions
"cpu": 1024,
"memory": 4096
}
)
Or in `prefect.yaml`:
yaml
work_pool:
name: ecs-pool
job_variables:
image: "{{ build_image.image }}"
family: "psp-prefect-tasks"
cpu: 1024
memory: 4096
The family
value:
- Should be unique for different types of tasks
- Can contain letters (upper and lower), numbers, hyphens, and underscores
- Is used to track different versions of similar tasks
- Helps with organization and management of task definitions
Some common patterns for naming families:
python
# By project and environment
"family": "psp-prefect-prod"
"family": "psp-prefect-dev"
# By function
"family": "psp-data-sync-tasks"
"family": "psp-processing-tasks"
# By resource requirements
"family": "psp-high-memory-tasks"
"family": "psp-standard-tasks"
# By service
"family": "powerplay-sync-tasks"
"family": "servicetitan-sync-tasks"
When AWS creates task definition revisions, they'll be grouped under this family name, making it easier to:
- Track changes over time
- Rollback to previous versions if needed
- Organize different types of tasks
- Manage permissions and policies
For example, if you're running different types of flows:
python
# PowerPlay sync flows
my_powerplay_flow.deploy(
name="powerplay-sync",
work_pool_name="ecs-pool",
job_variables={
"family": "powerplay-sync-tasks",
"cpu": 1024,
"memory": 4096
}
)
# ServiceTitan flows
my_servicetitan_flow.deploy(
name="servicetitan-sync",
work_pool_name="ecs-pool",
job_variables={
"family": "servicetitan-sync-tasks",
"cpu": 2048,
"memory": 8192
}
)
This organization helps with:
1. Resource Management: Group tasks with similar resource needs
2. Monitoring: Easier to track specific types of tasks
3. Permissions: Can set IAM roles per family
4. Cost Tracking: Better ability to track costs by task type
5. Maintenance: Easier to update similar tasks together
Best practices for family names:
1. Be descriptive but concise
2. Use consistent naming conventions
3. Include environment if relevant
4. Consider including resource requirements if significant
5. Use hyphens for readability
Example of a complete deployment with family:
python
my_flow.deploy(
name="ecs-test-deployment",
work_pool_name="ecs-pool",
job_variables={
"image": "<http://148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest|148761666380.dkr.ecr.us-east-2.amazonaws.com/psp-prefect-ecr-repo:latest>",
"family": "psp-prefect-prod",
"cpu": 1024,
"memory": 4096,
"env": {
"PREFECT_LOGGING_LEVEL": "DEBUG",
"ENVIRONMENT": "production"
},
"tags": [
{"key": "Family", "value": "psp-prefect-prod"},
{"key": "Environment", "value": "production"}
]
},
description="Production ECS task for PSP Prefect flows",
tags=["ecs", "production"]
)
Fernando Müller
07/14/2025, 5:36 PMMarvin
07/14/2025, 5:36 PMMarvin
07/14/2025, 5:37 PMRegisterTaskDefinition
operations if you're running many flows concurrently.
3. Can it be avoided?
There are a few strategies to manage this:
a. Use a Pre-defined Task Definition:
- You can specify a pre-existing task definition ARN in your work pool configuration
- This approach requires careful consideration of logging configuration since the flow run name won't be automatically included in the log stream prefix
b. Manage Task Definition Cleanup:
- Regularly deregister old task definition revisions to prevent accumulation
- AWS maintains the last 100 revisions of a task definition family
4. Best Practices:
- If you're hitting rate limits, consider implementing a cleanup strategy for old task definitions
- If you need to use a fixed task definition, be aware that you might lose some of the automatic logging configuration benefits
If you're experiencing specific issues with task definition revisions (like rate limits), let me know and I can provide more targeted solutions for your use case. Also, if you'd like to see the specific configuration options for either approach, I'd be happy to provide those details.Fernando Müller
07/14/2025, 5:39 PMMarvin
07/14/2025, 5:39 PMMarvin
07/14/2025, 5:40 PMbash
# List task definition families
aws ecs list-task-definition-families
# List all revisions for a family
aws ecs list-task-definitions --family-prefix your-family-name
# Deregister a specific task definition
aws ecs deregister-task-definition --task-definition family:revision
2. Python Script Solution (More automated):
python
import boto3
def cleanup_task_definitions(family_prefix, keep_last_n=5):
"""
Clean up old task definitions, keeping the N most recent ones.
Args:
family_prefix (str): Task definition family name
keep_last_n (int): Number of most recent revisions to keep
"""
ecs = boto3.client('ecs')
# Get all task definitions for the family
task_defs = ecs.list_task_definitions(
familyPrefix=family_prefix,
sort='DESC' # Gets newest first
)['taskDefinitionArns']
# Keep the N most recent, deregister the rest
for task_def_arn in task_defs[keep_last_n:]:
try:
print(f"Deregistering {task_def_arn}")
ecs.deregister_task_definition(
taskDefinition=task_def_arn
)
except Exception as e:
print(f"Error deregistering {task_def_arn}: {e}")
# Example usage
# cleanup_task_definitions('your-family-name', keep_last_n=5)
3. As a Prefect Flow (Integrated solution):
python
from prefect import flow, task
import boto3
@task
def get_task_definitions(family_prefix):
ecs = boto3.client('ecs')
return ecs.list_task_definitions(
familyPrefix=family_prefix,
sort='DESC'
)['taskDefinitionArns']
@task
def deregister_task_definition(task_def_arn):
ecs = boto3.client('ecs')
return ecs.deregister_task_definition(
taskDefinition=task_def_arn
)
@flow
def cleanup_ecs_task_definitions(family_prefix: str, keep_last_n: int = 5):
task_defs = get_task_definitions(family_prefix)
for task_def_arn in task_defs[keep_last_n:]:
deregister_task_definition(task_def_arn)
print(f"Deregistered {task_def_arn}")
# Example usage
# cleanup_ecs_task_definitions("your-family-name", keep_last_n=5)
Important Notes:
1. Deregistering a task definition:
- Doesn't delete it immediately
- Prevents it from being used for new tasks
- The task definition remains discoverable for some time
- Running tasks using that definition aren't affected
2. Best Practices:
- Keep at least a few recent revisions for rollback purposes
- Consider running cleanup regularly (e.g., weekly)
- Check that task definitions aren't in use before deregistering
- Be careful with the keep_last_n
parameter to ensure you retain enough revisions
3. AWS IAM Permissions:
You'll need these permissions in your IAM role:
- ecs:ListTaskDefinitions
- ecs:DescribeTaskDefinition
- ecs:DeregisterTaskDefinition
Would you like me to elaborate on any of these approaches or provide more specific guidance for your use case?