matheus sampaio
08/04/2025, 4:11 AMMarvin
08/04/2025, 4:11 AMMarvin
08/04/2025, 4:12 AMpython
from prefect import flow
from datetime import timedelta
from prefect.client.schemas.schedules import CronSchedule, IntervalSchedule
# Define your flow
@flow
def my_flow(name: str = "default"):
print(f"Running flow with name: {name}")
# Method 1: Deploy with an interval schedule
flow_deployment = my_flow.from_source(
source=".", # Use current directory as source
entrypoint="path/to/your/script.py:my_flow" # Path to your flow file and function
).deploy(
name="my-scheduled-deployment",
work_pool_name="my-work-pool",
schedule=IntervalSchedule(interval=timedelta(hours=1)), # Run every hour
)
# Method 2: Deploy with a cron schedule
flow_deployment = my_flow.from_source(
source=".",
entrypoint="path/to/your/script.py:my_flow"
).deploy(
name="my-cron-deployment",
work_pool_name="my-work-pool",
schedule=CronSchedule(cron="0 * * * *"), # Run at the start of every hour
)
# To run the flow immediately without creating a deployment
my_flow(name="direct run")
Key points about this approach:
1. Direct Flow Runs: You can run flows directly by calling them like regular Python functions (my_flow()
).
2. Deployment Creation: Use flow.from_source()
to specify where your flow code lives, followed by .deploy()
to create the deployment.
3. Scheduling Options:
- `IntervalSchedule`: For simple, regular intervals (every N minutes/hours/days)
- `CronSchedule`: For more complex scheduling patterns using cron syntax
- You can also use RRuleSchedule
for advanced recurring patterns
4. Work Pools: You'll need to specify a work pool name where your flow will run. Make sure the work pool exists and has an appropriate worker running.
Important notes:
- Make sure you have a Prefect server running (prefect server start
)
- Ensure you have a work pool created and a worker running for that work pool
- The source="."
parameter assumes your code is in the current directory. Adjust this if your code is in a different location or repository
If you want to manage your deployments programmatically (like updating schedules or pausing them), you can use the Prefect client:
python
from prefect.client import get_client
async with get_client() as client:
# Pause a deployment
await client.pause_deployment("deployment-name")
# Resume a deployment
await client.resume_deployment("deployment-name")
Would you like me to provide more specific examples for any of these aspects, such as using different types of schedules or managing deployments programmatically?matheus sampaio
08/04/2025, 4:13 AMMarvin
08/04/2025, 4:13 AMMarvin
08/04/2025, 4:15 AMbash
prefect work-pool create my-docker-pool --type docker
5. Build and run your Docker container:
bash
# Build the image
docker build -t your-image-name:tag .
# Run the container
docker run -d \
-e PREFECT_API_URL="<http://host.docker.internal:4200/api>" \
-e PREFECT_API_KEY="your-api-key-if-needed" \
your-image-name:tag
Important considerations for Docker:
1. Network Configuration:
- If running Prefect server locally, use host.docker.internal
instead of localhost
in your PREFECT_API_URL
- Ensure proper port mappings if needed (-p
flag in docker run)
2. Environment Variables:
- Set PREFECT_API_URL
to point to your Prefect server
- Include any other environment variables your flows need
3. Volume Mounts:
- If your flows need access to local files, add volume mounts:
bash
docker run -v $(pwd):/app ...
4. Docker Network:
- If running both Prefect server and your flows in Docker, create a Docker network:
bash
docker network create prefect-network
docker run --network prefect-network ...
5. Persistence:
- Consider mounting volumes for any data that needs to persist between container restarts
Here's a docker-compose.yml
alternative that ties it all together:
yaml
version: '3'
services:
flow-scheduler:
build: .
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
networks:
- prefect-network
volumes:
- .:/app
depends_on:
- prefect-server
prefect-server:
image: prefecthq/prefect:3-latest
command: prefect server start
ports:
- "4200:4200"
networks:
- prefect-network
networks:
prefect-network:
driver: bridge
Would you like me to elaborate on any of these aspects or provide more specific examples for your use case?matheus sampaio
08/04/2025, 4:16 AMMarvin
08/04/2025, 4:17 AMMarvin
08/04/2025, 4:18 AMpython
from prefect.client import get_client
from prefect.infrastructure import Infrastructure, Process, Docker, Kubernetes
async def create_work_pool():
async with get_client() as client:
# Create a process work pool
process_pool = await client.create_work_pool(
name="my-process-pool",
work_pool_type="process"
)
# Create a Docker work pool
docker_pool = await client.create_work_pool(
name="my-docker-pool",
work_pool_type="docker",
base_job_template={
"job_configuration": {
"image": "prefecthq/prefect:2-python3.9",
"network_mode": "host"
}
}
)
# Run the async function
import asyncio
asyncio.run(create_work_pool())
You can also create a more comprehensive script that handles different work pool types and configurations:
python
from prefect.client import get_client
import asyncio
from typing import Optional, Dict, Any
async def create_work_pool(
name: str,
pool_type: str,
base_job_template: Optional[Dict[str, Any]] = None,
paused: bool = False
) -> None:
"""
Create a Prefect work pool with specified configuration.
Args:
name: Name of the work pool
pool_type: Type of work pool ('process', 'docker', or 'kubernetes')
base_job_template: Optional configuration for the work pool
paused: Whether to create the work pool in a paused state
"""
async with get_client() as client:
try:
# Default templates for different pool types
default_templates = {
"docker": {
"job_configuration": {
"image": "prefecthq/prefect:2-python3.9",
"network_mode": "host"
}
},
"kubernetes": {
"job_configuration": {
"namespace": "default",
"image": "prefecthq/prefect:2-python3.9"
}
},
"process": {}
}
# Use provided template or default
template = base_job_template or default_templates.get(pool_type, {})
work_pool = await client.create_work_pool(
name=name,
work_pool_type=pool_type,
base_job_template=template,
paused=paused
)
print(f"Successfully created work pool: {name} (Type: {pool_type})")
return work_pool
except Exception as e:
print(f"Error creating work pool: {str(e)}")
raise
# Example usage
async def main():
# Create a process work pool
await create_work_pool(
name="my-process-pool",
pool_type="process"
)
# Create a Docker work pool with custom configuration
await create_work_pool(
name="my-docker-pool",
pool_type="docker",
base_job_template={
"job_configuration": {
"image": "your-custom-image:latest",
"network_mode": "host",
"volumes": ["${pwd}:/opt/prefect/flows"],
"env": {
"EXTRA_PIP_PACKAGES": "pandas numpy"
}
}
}
)
# Create a Kubernetes work pool
await create_work_pool(
name="my-k8s-pool",
pool_type="kubernetes",
base_job_template={
"job_configuration": {
"namespace": "prefect",
"image": "your-custom-image:latest",
"resource_requests": {
"memory": "512Mi",
"cpu": "500m"
}
}
}
)
if __name__ == "__main__":
asyncio.run(main())
Marvin
08/04/2025, 4:18 AMpython
async def list_work_pools():
async with get_client() as client:
work_pools = await client.read_work_pools()
for pool in work_pools:
print(f"Pool: {pool.name} (Type: {pool.type})")
async def delete_work_pool(name: str):
async with get_client() as client:
await client.delete_work_pool(name)
print(f"Deleted work pool: {name}")
async def pause_work_pool(name: str):
async with get_client() as client:
await client.update_work_pool(name, paused=True)
print(f"Paused work pool: {name}")
Key points about work pools:
1. Types Available:
- `process`: For local execution
- `docker`: For Docker-based execution
- `kubernetes`: For Kubernetes-based execution
2. Base Job Templates:
- Can be customized based on your needs
- Include infrastructure-specific configuration
- Can be updated after creation
3. Error Handling:
- The script includes basic error handling
- You might want to add more specific error handling based on your needs
4. Async Operation:
- All operations are async and require proper async/await handling
- Use asyncio.run()
to execute async functions from synchronous code
Would you like me to provide more specific examples for any particular work pool type or configuration?matheus sampaio
08/04/2025, 4:43 AMMarvin
08/04/2025, 4:43 AMMarvin
08/04/2025, 4:44 AMcreate_work_pool.py
):
python
from prefect.client import get_client
import asyncio
import time
async def create_work_pool():
# Wait for Prefect server to be ready
max_retries = 30
for i in range(max_retries):
try:
async with get_client() as client:
# Check if work pool already exists
existing_pools = await client.read_work_pools()
if not any(pool.name == "my-docker-pool" for pool in existing_pools):
await client.create_work_pool(
name="my-docker-pool",
work_pool_type="docker",
base_job_template={
"job_configuration": {
"image": "prefecthq/prefect:2-python3.9",
"network_mode": "prefect-network"
}
}
)
print("Successfully created Docker work pool")
else:
print("Work pool already exists")
break
except Exception as e:
if i == max_retries - 1:
print(f"Failed to create work pool after {max_retries} attempts: {str(e)}")
raise
print(f"Waiting for Prefect server to be ready... (attempt {i+1}/{max_retries})")
time.sleep(2)
if __name__ == "__main__":
asyncio.run(create_work_pool())
2. Create a Dockerfile
for the work pool creation:
dockerfile
FROM python:3.9-slim
WORKDIR /app
RUN pip install prefect>=3.0.0
COPY create_work_pool.py .
CMD ["python", "create_work_pool.py"]
3. Create your `docker-compose.yml`:
yaml
version: '3.8'
services:
prefect-server:
image: prefecthq/prefect:3-latest
restart: always
ports:
- "4200:4200"
environment:
- PREFECT_SERVER_API_HOST=0.0.0.0
- PREFECT_API_URL=<http://prefect-server:4200/api>
networks:
- prefect-network
healthcheck:
test: ["CMD", "curl", "-f", "<http://localhost:4200/api/health>"]
interval: 10s
timeout: 5s
retries: 5
work-pool-creator:
build:
context: .
dockerfile: Dockerfile
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
networks:
- prefect-network
depends_on:
prefect-server:
condition: service_healthy
prefect-worker:
image: prefecthq/prefect:3-latest
restart: always
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
command: ["prefect", "worker", "start", "--pool", "my-docker-pool"]
networks:
- prefect-network
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
work-pool-creator:
condition: service_completed_successfully
networks:
prefect-network:
name: prefect-network
To use this setup:
1. Put all these files in the same directory:
your-project/
├── docker-compose.yml
├── Dockerfile
└── create_work_pool.py
2. Start everything with:
bash
docker-compose up -d
This setup will:
1. Start the Prefect server
2. Create the work pool (waiting for the server to be ready)
3. Start a worker for that work pool
Some important notes about this setup:
1. Network Configuration:
- All services are on the prefect-network
network
- The server is accessible on port 4200
2. Dependencies:
- The work pool creator waits for the server to be healthy
- The worker waits for the work pool creator to finish
3. Persistence:
- The worker has access to the Docker socket to spawn containers
- You might want to add volume mounts for persistent data
4. Environment Variables:
- PREFECT_API_URL
is set to point to the server
- You can add more environment variables as neededMarvin
08/04/2025, 4:44 AMbash
prefect config set PREFECT_API_URL=<http://localhost:4200/api>
You can then deploy flows to this setup using the Python code we discussed earlier:
python
my_flow.from_source(
source=".",
entrypoint="your_script.py:my_flow"
).deploy(
name="my-docker-deployment",
work_pool_name="my-docker-pool"
)
Would you like me to explain any part of this setup in more detail or show you how to add additional configurations?