Anish S
07/15/2025, 8:04 PMMarvin
07/15/2025, 8:04 PMMarvin
07/15/2025, 8:05 PMbash
# Create a Docker work pool
prefect work-pool create "docker-pool" --type docker
# Start a worker
prefect worker start -p "docker-pool"
# Run your deployment
prefect deployment run my-flow/docker-deployment
Some important notes:
1. Make sure Docker is installed and running on your system.
2. The Docker infrastructure will automatically handle:
- Building the Docker image
- Running your task in a container
- Managing container lifecycle
- Passing results back to your flow
3. If you need to customize the Docker environment, you can:
- Provide a custom Dockerfile
- Specify additional build arguments
- Set environment variables
- Mount volumes
- Configure network settings
4. For production use, consider:
- Using specific image tags instead of 'latest'
- Properly managing secrets and environment variables
- Setting resource limits for your containers
- Implementing proper error handling
The Docker infrastructure in Prefect 3.x provides a robust way to isolate and scale your tasks while maintaining consistency across different environments. You can find more detailed information in the Prefect documentation about Docker infrastructure.
Would you like me to elaborate on any particular aspect of Docker-based tasks in Prefect, such as custom Dockerfile configuration or advanced Docker settings?Anish S
07/15/2025, 8:07 PMMarvin
07/15/2025, 8:07 PMMarvin
07/15/2025, 8:09 PMpython
from prefect import task
import subprocess
from typing import List, Optional, Dict
@task
def run_docker_container(
image: str,
command: Optional[List[str]] = None,
environment: Optional[Dict[str, str]] = None,
volumes: Optional[List[str]] = None,
network: Optional[str] = None,
remove: bool = True
) -> subprocess.CompletedProcess:
"""
Run a Docker container as a Prefect task.
Args:
image: Docker image name
command: Command to run in the container
environment: Environment variables to pass to the container
volumes: Volume mappings
network: Docker network to use
remove: Whether to remove the container after execution
Returns:
CompletedProcess instance with return code and output
"""
# Build the docker command
docker_cmd = ["docker", "run"]
# Add optional parameters
if remove:
docker_cmd.append("--rm")
if environment:
for key, value in environment.items():
docker_cmd.extend(["-e", f"{key}={value}"])
if volumes:
for volume in volumes:
docker_cmd.extend(["-v", volume])
if network:
docker_cmd.extend(["--network", network])
# Add the image name
docker_cmd.append(image)
# Add the command if specified
if command:
docker_cmd.extend(command)
try:
# Run the container
result = subprocess.run(
docker_cmd,
check=True,
text=True,
capture_output=True
)
return result
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Docker container failed: {e.stderr}")
2. Async Version (if you need non-blocking container execution):
python
from prefect import task
import asyncio
from typing import List, Optional, Dict
@task
async def run_docker_container_async(
image: str,
command: Optional[List[str]] = None,
environment: Optional[Dict[str, str]] = None,
volumes: Optional[List[str]] = None,
stream_logs: bool = True
) -> str:
"""
Run a Docker container asynchronously as a Prefect task.
Args:
image: Docker image name
command: Command to run in the container
environment: Environment variables to pass to the container
volumes: Volume mappings
stream_logs: Whether to stream container logs
Returns:
Container output
"""
docker_cmd = ["docker", "run", "--rm"]
if environment:
for key, value in environment.items():
docker_cmd.extend(["-e", f"{key}={value}"])
if volumes:
for volume in volumes:
docker_cmd.extend(["-v", volume])
docker_cmd.append(image)
if command:
docker_cmd.extend(command)
process = await asyncio.create_subprocess_exec(
*docker_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Docker container failed: {stderr.decode()}")
return stdout.decode()
3. Example usage in a flow:
```python
from prefect import flow
@flow
def docker_processing_flow():
# Run a simple container
result1 = run_docker_container(
image="alpine:latest",
command=["echo", "Hello from Docker!"]
)
print(f"Simple container output: {result1.stdout}")
# Run a container with more options
result2 = run_docker_container(
image="postgres:13",
environment={
"POSTGRES_PASSWORD": "secretpassword",
"POSTGRES_DB": "testdb"
},
volumes=["/local/path:/container/path"],
network="my-network"
)Marvin
07/15/2025, 8:09 PMKey features of these implementations:
1. *Error Handling*: Both versions include proper error handling and will raise meaningful exceptions if the container fails.
2. *Flexibility*: The tasks accept various Docker options like:
- Environment variables
- Volume mappings
- Network configuration
- Custom commands
3. *Output Capture*: Both implementations capture and return the container's output.
4. *Container Cleanup*: The `--rm` flag ensures containers are removed after execution.
Some important considerations:
1. *Security*:
```python
# Be careful with user input - validate and sanitize any parameters
@task
def secure_docker_run(image: str):
# Validate image name format
if not re.match(r'^[a-zA-Z0-9_.-]+(?::[a-zA-Z0-9_.-]+)?$', image):
raise ValueError("Invalid image name")
# ... rest of the implementation
2. Resource Management:
python
@task
def resource_controlled_docker_run(
image: str,
memory_limit: str = "512m",
cpu_limit: str = "0.5"
):
docker_cmd = [
"docker", "run",
"--memory", memory_limit,
"--cpus", cpu_limit,
image
]
# ... rest of the implementation
3. Logging:
python
from prefect import get_run_logger
@task
def logged_docker_run(image: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Starting container with image: {image}")
# ... implementation
<http://logger.info|logger.info>("Container completed successfully")
To use these tasks, make sure:
1. Docker is installed and running on the machine where the task will execute
2. The user running the Prefect flow has permissions to execute Docker commands
3. Any required images are either available locally or can be pulled from a registry
4. Network, volume, and environment configurations are properly set up
Would you like me to elaborate on any particular aspect, such as handling specific Docker options or implementing additional features?