Amith M
08/07/2025, 6:54 AMMarvin
08/07/2025, 6:54 AMMarvin
08/07/2025, 6:55 AMpython
from prefect import get_client
from datetime import datetime, timedelta
async def schedule_flow_run():
# Create a client
client = get_client()
# Create a flow run from a deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id", # Required: The deployment ID
scheduled_time=datetime.now() + timedelta(minutes=5), # Optional: When to schedule the run
name="my-scheduled-run", # Optional: Name for the flow run
parameters={"param1": "value1"}, # Optional: Parameters for the flow
tags=["scheduled"], # Optional: Tags for the flow run
)
return flow_run
Key points about scheduling flow runs:
1. Required Parameters:
- `deployment_id`: The UUID of the deployment you want to run (required)
2. Optional Parameters for Scheduling:
- `scheduled_time`: When you want the flow to run (datetime object)
- `name`: A custom name for the flow run
- `parameters`: Dictionary of parameters to pass to the flow
- `tags`: List of tags to apply to the flow run
- `job_variables`: Dictionary of variables specific to this run
- `work_queue_name`: Specific work queue to use
3. Important Notes:
- The client methods are async, so you need to use them with await
- If no scheduled_time
is provided, the flow run will be created in a 'SCHEDULED' state and will run according to the deployment's configuration
- The flow run will inherit any configuration from the deployment if not explicitly overridden
4. Using in Synchronous Code:
If you're in a synchronous context, you can use it like this:
python
from asyncio import run
def schedule_flow_run_sync():
return run(schedule_flow_run())
The method will return a FlowRun object containing details about the scheduled run, including its ID and status.
For more details, you can refer to the Prefect API documentation.
Let me know if you need any clarification or have questions about specific parameters!Amith M
08/07/2025, 6:56 AMAmith M
08/07/2025, 6:57 AMMarvin
08/07/2025, 6:57 AMMarvin
08/07/2025, 7:00 AMpython
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import IntervalSchedule
from datetime import timedelta
@flow
def your_flow():
# Your flow logic here
pass
# Deploy with an interval schedule
deployment = flow.from_source(
source="your_source",
name="recursive-flow",
).set_schedule(IntervalSchedule(interval=timedelta(hours=1)))
deployment.deploy()
2. Self-Scheduling Pattern (Alternative Approach)
python
from prefect import flow, get_client
from datetime import datetime, timedelta
@flow
async def self_scheduling_flow():
# Your flow logic here
# Schedule the next run
client = get_client()
next_run_time = datetime.now() + timedelta(hours=1)
await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id",
scheduled_time=next_run_time,
parameters={"your": "parameters"},
)
3. Using RRule for Complex Recurring Patterns
python
from prefect.server.schemas.schedules import RRuleSchedule
from datetime import datetime
schedule = RRuleSchedule(
rrule="FREQ=DAILY;INTERVAL=1;COUNT=7", # Runs daily for 7 days
timezone="UTC"
)
deployment = flow.from_source(
source="your_source",
name="complex-recursive-flow",
).set_schedule(schedule)
deployment.deploy()
Important Considerations:
1. Best Practices:
- Prefer using built-in scheduling mechanisms over self-scheduling when possible
- Use the appropriate schedule type for your needs:
- `IntervalSchedule`: For fixed time intervals
- `CronSchedule`: For cron-like scheduling
- `RRuleSchedule`: For complex recurring patterns
2. Limitations:
- Self-scheduling patterns might lead to drift over time
- There's no built-in mechanism to prevent duplicate schedules
- Self-scheduling flows need error handling to prevent scheduling failures
3. Monitoring:
- Monitor your flow runs to ensure they're executing as expected
- Use tags to track recursively scheduled runs
- Set up notifications for scheduling failures
4. Error Handling:
python
@flow
async def robust_self_scheduling_flow():
try:
# Your flow logic here
client = get_client()
next_run_time = datetime.now() + timedelta(hours=1)
await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id",
scheduled_time=next_run_time,
tags=["self-scheduled"],
parameters={"your": "parameters"},
)
except Exception as e:
# Handle scheduling errors
logger = get_run_logger()
logger.error(f"Failed to schedule next run: {e}")
raise
The recommended approach is to use Prefect's built-in scheduling mechanisms (Option 1) as they are more reliable and maintainable. Self-scheduling (Option 2) should only be used when you need very specific dynamic scheduling logic that can't be achieved with built-in schedules.
Would you like me to elaborate on any of these approaches or explain how to implement a specific scheduling pattern?Amith M
08/07/2025, 7:01 AMMarvin
08/07/2025, 7:01 AMMarvin
08/07/2025, 7:02 AMpython
from prefect import flow
from prefect.deployments import run_deployment
from datetime import datetime, timedelta
# Option 1: Using run_deployment
async def schedule_flow_run():
next_run_time = datetime.now() + timedelta(hours=1)
deployment_run = await run_deployment(
name="your-deployment/your-deployment-name",
scheduled_time=next_run_time,
parameters={"your": "parameters"},
tags=["scheduled"]
)
return deployment_run
# Option 2: If you need to use create_flow_run_from_deployment,
# you'll need to set the state directly
from prefect import get_client
from prefect.states import Scheduled
async def schedule_flow_run_alternative():
client = get_client()
next_run_time = datetime.now() + timedelta(hours=1)
flow_run = await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id",
state=Scheduled(scheduled_time=next_run_time),
parameters={"your": "parameters"},
tags=["scheduled"]
)
return flow_run
For a self-scheduling recursive flow, you could do something like this:
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from datetime import datetime, timedelta
@flow
async def recursive_flow():
logger = get_run_logger()
# Your flow logic here
<http://logger.info|logger.info>("Executing flow logic...")
# Schedule the next run
try:
next_run_time = datetime.now() + timedelta(hours=1)
deployment_run = await run_deployment(
name="your-deployment/recursive-flow",
scheduled_time=next_run_time,
parameters={"your": "parameters"},
tags=["self-scheduled"]
)
<http://logger.info|logger.info>(f"Successfully scheduled next run for {next_run_time}")
except Exception as e:
logger.error(f"Failed to schedule next run: {e}")
raise
Key points:
1. Use run_deployment
instead of create_flow_run_from_deployment
when you want to directly specify a scheduled time
2. If you need to use create_flow_run_from_deployment
, you'll need to create a Scheduled
state object
3. Always include error handling for scheduling operations
4. Use tags to track self-scheduled runs
5. Consider using logging to track scheduling success/failure
Would you like me to explain any part of this in more detail or show you how to implement a specific scheduling pattern?