Janghoo Lee
06/14/2024, 7:42 AMMarvin
06/14/2024, 7:42 AMprefect.yaml
file, along with a basic flow example.
### Schedule Configuration:
You can define the schedule in the prefect.yaml
like this:
yaml
deployments:
...
schedules:
- cron: "*/5 * * * *"
timezone: "UTC"
active: true
...
This cron expression */5 * * * *
ensures that the flow runs every 5 minutes.
### Example Flow:
Here’s a basic Prefect flow:
python
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
"""Just a Python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""Tasks must be called in the flow to be executed (at this time)"""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# Run the flow
my_flow()
# You can run tasks' functions directly outside of a flow if you want
# but it won't be tracked by Prefect, it's just a normal function
assert add.fn(1, 2) == 3
### Useful Links:
1. Prefect Schedules
2. Prefect Code Example
Feel free to use these references, and if you need any further clarification, let me know!Janghoo Lee
06/14/2024, 7:47 AMMarvin
06/14/2024, 7:47 AM.yaml
files for scheduling, you can programmatically schedule your flows using Prefect's Python API.
### Programmatic Scheduling with Python
Here’s how you can create a schedule programmatically using the CronSchedule
class:
python
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.schedules import CronSchedule
@task
def add(x: int, y: int) -> int:
return x + y
@flow
def my_flow():
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
# Create a deployment with a cron schedule
deployment = Deployment.build_from_flow(
flow=my_flow,
name="cron-scheduled-flow",
schedule=CronSchedule(cron="*/5 * * * *"), # runs every 5 minutes
)
if __name__ == "__main__":
deployment.apply()
# Optionally: Trigger the flow to run immediately for testing
my_flow()
### Explanation
- Flow Definition: Define your flow and tasks as usual.
- CronSchedule: Use CronSchedule
to create a schedule that runs every 5 minutes using the cron expression */5 * * * *
.
- Deployment: Use Deployment.build_from_flow
to link the schedule with your flow.
- Apply Deployment: Apply the deployment which starts the flow scheduler.
### Useful Links
- Prefect Schedules Documentation
- Prefect CronSchedule Documentation
Feel free to use these references to tweak your setup further. Let me know if you need more assistance!Nate
06/14/2024, 3:51 PMMarvin
06/14/2024, 3:51 PMserve
method to create a scheduled flow using a cron expression without relying on a .yaml
file. Below is an example demonstrating how to set this up:
### Example Flow with serve
and Cron Schedule
python
from prefect import flow, task
@task
def add(x: int, y: int) -> int:
return x + y
@flow
def my_flow():
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# Use serve to create a cron schedule for the flow
my_flow.serve(name="cron-scheduled-flow", cron="*/5 * * * *")
### Explanation
- Flow and Task Definition: Define your tasks and flow as usual.
- Cron Schedule: Use the serve
method with the cron
parameter to specify a cron expression (*/5 * * * *
for running every 5 minutes).
### Applying the Deployment
Run the script to apply the deployment and start the scheduler as follows:
sh
python script_name.py
This will schedule your flow to run every 5 minutes according to the cron expression provided.
### Useful Links
- Prefect Schedules Documentation
- Prefect `serve` Method Example (Documentation Reference)
Feel free to adapt the example to suit your needs. Let me know if you have more questions or need further assistance!