Vitali
06/29/2025, 3:21 PMMarvin
06/29/2025, 3:21 PMMarvin
06/29/2025, 3:23 PMDockerContainer
infrastructure block. This allows you to:
- Run pre-built Docker images
- Pass environment variables and commands
- Configure resource requirements
- Handle container lifecycle
2. Configuration Setup
Create a prefect.yaml
file to define your deployment:
yaml
deployments:
- name: docker-flow
version: 1
tags: []
description: "Runs a pre-built Docker container"
schedule: {}
flow_name: my_docker_flow
entrypoint: flow.py:my_flow
infrastructure:
type: docker-container
env: {}
image: your-image:tag
image_pull_policy: ALWAYS
command: []
3. Deployment Creation
You can create the deployment using either:
- The CLI: prefect deploy
- Python code:
python
from prefect import flow
from prefect.infrastructure import DockerContainer
docker_container = DockerContainer(
image="your-image:tag",
image_pull_policy="ALWAYS"
)
@flow
def my_flow():
# Your flow logic here
pass
flow.from_source(
source="local/",
entrypoint="flow.py:my_flow"
).deploy(
name="docker-flow",
infrastructure=docker_container
)
4. Best Practices
- Use specific image tags rather than latest
to ensure reproducibility
- Configure appropriate resource limits in your Docker infrastructure
- Consider using Prefect's built-in retry mechanisms for resilient container execution
- Use environment variables for sensitive configuration
- Implement proper logging within your containers to leverage Prefect's logging infrastructure
5. Advanced Features
- You can use private container registries by configuring appropriate credentials
- Scale horizontally by running multiple containers across different workers
- Implement health checks and container lifecycle management
- Chain multiple container executions in a single flow
For more detailed information and examples, check out:
- Docker Infrastructure Guide
- Creating Deployments
- Scalable Microservices with Prefect and Docker
Would you like me to elaborate on any particular aspect of Docker orchestration with Prefect?Vitali
06/29/2025, 3:33 PMyour-image:tag
. Is it correct?Marvin
06/29/2025, 3:34 PMMarvin
06/29/2025, 3:34 PMpython
from prefect import flow
from prefect.infrastructure import DockerContainer
docker_container = DockerContainer(
image="your-image:tag"
)
@flow
def my_flow():
# This flow code runs INSIDE the container
pass
This approach packages and runs your Prefect flow code inside the specified container. The container needs Python and Prefect installed.
2. Running a Pre-built Docker Container AS PART of a Flow (probably what you want):
python
from prefect import flow, task
import subprocess
@task
def run_docker_container(image_name: str, command: str = None):
cmd = ["docker", "run", image_name]
if command:
cmd.extend(command.split())
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Container failed: {result.stderr}")
return result.stdout
@flow
def orchestrate_container():
# This flow runs your pre-built container
result = run_docker_container("your-image:tag", "your command here")
return result
Or using the Docker Python SDK for more control:
python
from prefect import flow, task
import docker
@task
def run_container(image_name: str, command: str = None):
client = docker.from_client()
container = client.containers.run(
image_name,
command=command,
detach=True
)
# Wait for container to complete
result = container.wait()
logs = container.logs().decode()
if result['StatusCode'] != 0:
raise Exception(f"Container failed: {logs}")
return logs
@flow
def orchestrate_container():
result = run_container("your-image:tag", "your command here")
return result
The key differences are:
- In approach #1, your Prefect flow code runs INSIDE the container
- In approach #2, your Prefect flow ORCHESTRATES the container from outside
For orchestrating pre-built Docker images, approach #2 is what you want. This allows you to:
- Run any pre-built Docker image without modifying it
- Chain multiple container executions
- Handle container outputs and errors within your flow
- Add Prefect's observability and retry logic around container execution
- Pass parameters and environment variables to your containers
- Manage container lifecycle (start, stop, cleanup)
Would you like me to provide more details about any specific aspect of container orchestration with Prefect?Vitali
06/29/2025, 3:37 PMprefect_docker
for approach #2?Marvin
06/29/2025, 3:37 PMMarvin
06/29/2025, 3:38 PMbash
pip install prefect-docker
2. Here's a basic example of running a container:
python
from prefect import flow
from prefect_docker.containers import DockerContainer
@flow
def run_container_flow():
container = DockerContainer(
image="your-image:tag",
command="your-command", # optional
environment={"ENV_VAR": "value"}, # optional
auto_remove=True # automatically remove container when done
)
# This runs the container and returns the logs
result = container.run()
return result
3. You can also use more advanced features:
python
from prefect import flow
from prefect_docker.containers import DockerContainer
@flow
def advanced_container_flow():
container = DockerContainer(
image="your-image:tag",
command="your-command",
environment={"ENV_VAR": "value"},
auto_remove=True,
volumes=["/host/path:/container/path"],
network_mode="host",
stream_output=True # stream logs in real-time
)
# Run with additional options
result = container.run(
pull_image=True, # force pull latest image
raise_on_error=True # raise exception if container exits with non-zero
)
return result
Key benefits of using `prefect-docker`:
- Proper integration with Prefect's task runners and logging
- Built-in retry logic and error handling
- Stream container logs directly to Prefect's logging system
- Easy configuration of volumes, networks, and environment variables
- Automatic container cleanup
- Support for private registries through Docker credentials
The prefect-docker
collection also provides additional utilities like:
- docker_pull
- Pull images explicitly
- docker_build
- Build images from Dockerfiles
- docker_push
- Push images to registries
For example, if you need to pull an image first:
python
from prefect import flow
from prefect_docker.containers import DockerContainer
from prefect_docker.images import docker_pull
@flow
def pull_and_run_flow():
# Pull the image first
docker_pull("your-image:tag")
# Then run the container
container = DockerContainer(
image="your-image:tag",
auto_remove=True
)
return container.run()
This approach using prefect-docker
is much more robust than using raw Docker commands or the Docker SDK directly, as it's specifically designed to work with Prefect's execution model and provides better observability and control.
Would you like to see examples of any specific Docker container scenarios with prefect-docker
?Vitali
06/29/2025, 3:43 PMMarvin
06/29/2025, 3:43 PMMarvin
06/29/2025, 3:44 PMpython
from prefect import flow
from prefect_docker.containers import DockerContainer
@flow
def process_data():
# Run a pandas data processing container
container = DockerContainer(
image="pandas-processor:latest",
command="python process.py --input /data/input.csv --output /data/output.csv",
volumes=["/local/data:/data"], # Mount local directory
environment={
"POSTGRES_HOST": "localhost",
"POSTGRES_PASSWORD": "secret"
}
)
return container.run()
2. ML Training with GPU
python
from prefect import flow
from prefect_docker.containers import DockerContainer
@flow
def train_model(dataset_path: str, epochs: int):
container = DockerContainer(
image="tensorflow/tensorflow:latest-gpu",
command=f"python train.py --epochs {epochs}",
volumes=[f"{dataset_path}:/data"],
device_requests=[{"Driver": "nvidia", "Count": 1}], # Request GPU
environment={"NVIDIA_VISIBLE_DEVICES": "all"},
stream_output=True # See training progress in real-time
)
return container.run()
3. Chaining Multiple Containers
python
from prefect import flow
from prefect_docker.containers import DockerContainer
import json
@flow
def etl_pipeline():
# Extract data
extract_container = DockerContainer(
image="data-extractor:latest",
volumes=["/tmp/data:/data"],
auto_remove=True
)
extract_result = extract_container.run()
# Transform data
transform_container = DockerContainer(
image="data-transformer:latest",
volumes=["/tmp/data:/data"], # Same volume as extractor
environment={"PROCESSING_MODE": "batch"},
auto_remove=True
)
transform_result = transform_container.run()
# Load data
load_container = DockerContainer(
image="data-loader:latest",
volumes=["/tmp/data:/data"],
environment={
"DB_HOST": "<http://warehouse.example.com|warehouse.example.com>",
"DB_PASSWORD": "secret"
},
auto_remove=True
)
load_result = load_container.run()
return {
"extract": extract_result,
"transform": transform_result,
"load": load_result
}
4. Using Private Registry with Authentication
python
from prefect import flow
from prefect_docker.containers import DockerContainer
from prefect_docker.images import docker_pull
@flow
def run_private_container():
# Pull from private registry
docker_pull(
"<http://private.registry.com/myapp:latest|private.registry.com/myapp:latest>",
auth_config={
"username": "user",
"password": "secret"
}
)
container = DockerContainer(
image="<http://private.registry.com/myapp:latest|private.registry.com/myapp:latest>",
auto_remove=True
)
return container.run()
5. Container with Error Handling and Retries
```python
from prefect import flow, task
from prefect_docker.containers import DockerContainer
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def run_container_with_retries(image: str, command: str):
container = DockerContainer(
image=image,
command=command,
auto_remove=True,
stream_output=True
)
try:
result = container.run(raise_on_error=True)
return result
except Exception as e:
print(f"Container failed: {e}")
raise
@flow
def resilient_container_flow():
try: