matheus sampaio
07/31/2025, 8:32 AMMarvin
07/31/2025, 8:32 AMMarvin
07/31/2025, 8:34 AMpython
from prefect import flow
from prefect.server.schemas.schedules import CronSchedule
import asyncio
@flow
async def flow_1():
print("Running flow 1")
# Your flow 1 logic here
@flow
async def flow_2():
print("Running flow 2")
# Your flow 2 logic here
@flow
async def flow_3():
print("Running flow 3")
# Your flow 3 logic here
async def main():
# Create deployments with different cron schedules
deployment_1 = await flow_1.from_source(
source=".", # Current directory
name="flow-1-deployment",
).deploy(
schedule=CronSchedule(cron="*/5 * * * *"), # Every 5 minutes
)
deployment_2 = await flow_2.from_source(
source=".",
name="flow-2-deployment",
).deploy(
schedule=CronSchedule(cron="0 */1 * * *"), # Every hour
)
deployment_3 = await flow_3.from_source(
source=".",
name="flow-3-deployment",
).deploy(
schedule=CronSchedule(cron="0 0 * * *"), # Once a day at midnight
)
# Serve all flows
async with asyncio.TaskGroup() as tg:
tg.create_task(flow_1.serve(name="flow-1-server"))
tg.create_task(flow_2.serve(name="flow-2-server"))
tg.create_task(flow_3.serve(name="flow-3-server"))
if __name__ == "__main__":
asyncio.run(main())
Key points about this implementation:
1. Async Flow Definitions: Each flow is defined with the async
keyword since we're working with asynchronous operations.
2. Cron Schedules: Each flow uses CronSchedule
with different cron expressions:
- */5 * * * *
(every 5 minutes)
- 0 */1 * * *
(every hour)
- 0 0 * * *
(daily at midnight)
3. Deployment Creation: Each flow is deployed with its own schedule using the .from_source().deploy()
pattern.
4. Concurrent Serving: The asyncio.TaskGroup()
is used to serve all flows concurrently.
To run this setup:
1. First, make sure you have a Prefect server running:
bash
prefect server start
2. In a separate terminal, run your Python script:
bash
python your_script.py
Important Notes:
- Make sure you have Prefect 3.x installed (pip install prefect
)
- The flows will run according to their respective cron schedules
- Each flow runs independently of the others
- The script will keep running until interrupted, continuously serving all flows
- You can monitor your flows through the Prefect UI at http://localhost:4200 (default)
If you need to modify the cron schedules, you can refer to standard cron syntax. For example:
- * * * * *
- Every minute
- */15 * * * *
- Every 15 minutes
- 0 9 * * *
- Every day at 9 AM
- 0 9 * * 1-5
- Every weekday at 9 AM
Would you like me to explain any part of this in more detail or show you how to set up different types of schedules?matheus sampaio
07/31/2025, 8:36 AMMarvin
07/31/2025, 8:36 AMMarvin
07/31/2025, 8:38 AMbash
# Create a work queue with concurrency limit
prefect work-queue create "sequential-queue" --concurrency-limit 1
Key considerations:
1. Concurrent execution (default):
- Flows run independently
- Better resource utilization
- Faster overall execution if flows don't depend on each other
2. Sequential execution:
- Flows wait for previous ones to complete
- Better for dependent workflows
- More predictable resource usage
- Useful when flows need to run in a specific order
3. Mixed approach:
- You can group certain flows to run sequentially while others run concurrently
- Use different concurrency contexts or work queues for different groups of flows
Would you like me to elaborate on any of these approaches or show you how to implement a specific execution pattern for your use case?matheus sampaio
07/31/2025, 8:39 AMMarvin
07/31/2025, 8:39 AMMarvin
07/31/2025, 8:40 AMpython
from prefect import flow, task
from prefect.concurrency import concurrency_context
@task
async def process_data(item):
# Simulate some processing
await asyncio.sleep(1)
print(f"Processing {item}")
@flow
async def concurrent_flow():
# Only allow 2 concurrent executions
async with concurrency_context("process-data", concurrency_limit=2):
items = range(5)
await asyncio.gather(*[process_data(item) for item in items])
# Run the flow
asyncio.run(concurrent_flow())
2. Tag-Based Concurrency (More flexible):
python
from prefect import flow, task, tags
import asyncio
@task
async def database_operation(id: int):
async with tags("database"): # Tag this operation
print(f"Database operation {id}")
await asyncio.sleep(1)
@task
async def api_operation(id: int):
async with tags("api"): # Different tag for API operations
print(f"API operation {id}")
await asyncio.sleep(1)
@flow
async def multi_resource_flow():
# Run multiple operations with different concurrency limits
async with tags("database"), concurrency_context("database", 2):
db_tasks = [database_operation(i) for i in range(5)]
await asyncio.gather(*db_tasks)
async with tags("api"), concurrency_context("api", 3):
api_tasks = [api_operation(i) for i in range(5)]
await asyncio.gather(*api_tasks)
# Run the flow
asyncio.run(multi_resource_flow())
3. Deployment-Level Concurrency (For scheduled flows):
python
from prefect import flow
from prefect.server.schemas.schedules import CronSchedule
@flow
async def flow_with_deployment_concurrency():
print("Running flow with deployment concurrency")
await asyncio.sleep(2)
async def main():
# Create deployment with work queue specification
deployment = await flow_with_deployment_concurrency.from_source(
source=".",
name="concurrent-deployment",
).deploy(
schedule=CronSchedule(cron="*/5 * * * *"),
work_queue_name="limited-queue", # Specify work queue
)
# First, create a work queue with concurrency limit using CLI:
# prefect work-queue create "limited-queue" --concurrency-limit 2
4. Mixed Concurrency Patterns (Complex scenarios):
python
from prefect import flow, task, tags
from prefect.concurrency import concurrency_context
import asyncio
@task
async def heavy_computation(id: int):
async with tags("compute"):
print(f"Computing {id}")
await asyncio.sleep(2)
@task
async def data_processing(id: int):
async with tags("data"):
print(f"Processing data {id}")
await asyncio.sleep(1)
@flow
async def complex_flow():
# Set up different concurrency contexts
async with concurrency_context("compute", 2): # Limit compute tasks
compute_tasks = [heavy_computation(i) for i in range(4)]
await asyncio.gather(*compute_tasks)
# Different concurrency for data processing
async with concurrency_context("data", 3): # Limit data processing tasks
process_tasks = [data_processing(i) for i in range(6)]
await asyncio.gather(*process_tasks)
@flow
async def parent_flow():
# Control overall flow concurrency
async with concurrency_context("parent", 1):
for _ in range(3):
await complex_flow()
# Run the parent flow
if __name__ == "__main__":
asyncio.run(parent_flow())
Key points about these implementations:
1. Concurrency Context:
- Uses concurrency_context
to limit concurrent executions
- Can be applied at task or flow level
- Supports both async and sync functions
2. Tags:
- Useful for grouping related operations
- Can be combined with concurrency contexts
- Allow for flexible resource management
3. Work Queues:
- Control concurrency at deployment levelMarvin
07/31/2025, 8:40 AMbash
# Create a work queue with concurrency limit
prefect work-queue create "my-queue" --concurrency-limit 2
# Monitor active runs
prefect work-queue preview "my-queue"
Would you like me to explain any of these patterns in more detail or show you how to implement a specific concurrency scenario?matheus sampaio
07/31/2025, 8:47 AMMarvin
07/31/2025, 8:47 AMMarvin
07/31/2025, 8:51 AMbash
# Create a work pool
prefect work-pool create "my-processes" --type process
# Create work queues (optional, they'll be created automatically if not exists)
prefect work-queue create "morning-queue" --pool "my-processes"
prefect work-queue create "midday-queue" --pool "my-processes"
prefect work-queue create "evening-queue" --pool "my-processes"
# Start a worker to process the flows
prefect worker start --pool "my-processes"
Key benefits of this approach:
1. Natural Isolation: Each flow runs in its own work queue, preventing interference
2. Resource Management: Work pools handle resource allocation automatically
3. Scalability: Easy to add more flows or modify schedules without worrying about conflicts
4. Monitoring: Better visibility into flow execution through the Prefect UI
You don't need concurrency controls in this case because:
- The flows run at different times
- Work pools and queues already provide isolation
- The scheduling system handles timing without blocking other flows
If you later need to run multiple instances of the same flow concurrently, you can add concurrency limits to specific work queues:
bash
prefect work-queue set-concurrency-limit "morning-queue" 2
Would you like me to explain any part of this setup in more detail or show you how to implement specific scheduling patterns for your flows?matheus sampaio
07/31/2025, 8:52 AMMarvin
07/31/2025, 8:52 AMMarvin
07/31/2025, 8:53 AMscheduled_flows.py
) that sets up multiple scheduled flows with proper organization and best practices:
python
from prefect import flow
from prefect.server.schemas.schedules import CronSchedule
from prefect.blocks.system import String
import asyncio
from datetime import datetime
# Define your flows
@flow(log_prints=True)
async def morning_etl():
print(f"Running morning ETL process at {datetime.now()}")
# Your morning ETL logic here
await asyncio.sleep(2) # Simulate work
@flow(log_prints=True)
async def midday_reporting():
print(f"Running midday reporting at {datetime.now()}")
# Your midday reporting logic here
await asyncio.sleep(2) # Simulate work
@flow(log_prints=True)
async def evening_cleanup():
print(f"Running evening cleanup at {datetime.now()}")
# Your evening cleanup logic here
await asyncio.sleep(2) # Simulate work
async def create_deployments():
"""Create deployments for all flows"""
# Define the work pool name
work_pool_name = "my-processes"
# Morning ETL deployment
morning_deployment = await morning_etl.from_source(
source=".", # Current directory
entrypoint="scheduled_flows.py:morning_etl", # Path to flow function
).deploy(
name="morning-etl",
work_pool_name=work_pool_name,
work_queue_name="morning-queue",
schedule=CronSchedule(
cron="0 8 * * *", # 8 AM daily
timezone="UTC"
)
)
print(f"Created morning ETL deployment: {morning_deployment.name}")
# Midday reporting deployment
midday_deployment = await midday_reporting.from_source(
source=".",
entrypoint="scheduled_flows.py:midday_reporting",
).deploy(
name="midday-reporting",
work_pool_name=work_pool_name,
work_queue_name="midday-queue",
schedule=CronSchedule(
cron="0 12 * * *", # 12 PM daily
timezone="UTC"
)
)
print(f"Created midday reporting deployment: {midday_deployment.name}")
# Evening cleanup deployment
evening_deployment = await evening_cleanup.from_source(
source=".",
entrypoint="scheduled_flows.py:evening_cleanup",
).deploy(
name="evening-cleanup",
work_pool_name=work_pool_name,
work_queue_name="evening-queue",
schedule=CronSchedule(
cron="0 20 * * *", # 8 PM daily
timezone="UTC"
)
)
print(f"Created evening cleanup deployment: {evening_deployment.name}")
# For testing flows locally
async def test_flows():
"""Test all flows locally"""
print("Testing flows...")
await morning_etl()
await midday_reporting()
await evening_cleanup()
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "test":
# Run flow tests
print("Running flow tests...")
asyncio.run(test_flows())
else:
# Create deployments
print("Creating deployments...")
asyncio.run(create_deployments())
To use this setup:
1. First, make sure you have a Prefect server running:
bash
prefect server start
2. In a new terminal, create the work pool:
bash
prefect work-pool create "my-processes" --type process
3. Deploy your flows:
bash
python scheduled_flows.py
4. Start a worker to run the flows:
bash
prefect worker start --pool "my-processes"
5. (Optional) Test your flows without deploying:
bash
python scheduled_flows.py test
Key features of this setup:
1. Organization:
- All flows are in one file
- Clear separation between flow definitions and deployment configuration
- Easy to maintain and modify
2. Flexibility:
- Each flow has its own schedule
- Separate work queues for different types of work
- Easy to add new flows or modify existing ones
3. Testing:
- Includes a test mode to run flows locallyMarvin
07/31/2025, 8:53 AMlog_prints=True
ensures flow prints appear in Prefect UI
- Each flow logs its execution time
- Separate work queues make it easy to monitor different types of work
To modify schedules or add new flows:
1. Add a new flow function with the @flow
decorator
2. Add its deployment configuration in create_deployments()
3. Add it to the test_flows()
function if needed
4. Redeploy by running the script again
You can monitor your flows in the Prefect UI at http://localhost:4200 (default).
Would you like me to explain any part of this setup in more detail or show you how to add specific features to your flows?