Janghoo Lee
06/14/2024, 7:42 AMMarvin
06/14/2024, 7:42 AMprefect.yaml
file, along with a basic flow example.
### Schedule Configuration:
You can define the schedule in the prefect.yaml
like this:
yaml
deployments:
...
schedules:
- cron: "*/5 * * * *"
timezone: "UTC"
active: true
...
This cron expression */5 * * * *
ensures that the flow runs every 5 minutes.
### Example Flow:
Here’s a basic Prefect flow:
python
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
"""Just a Python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""Tasks must be called in the flow to be executed (at this time)"""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# Run the flow
my_flow()
# You can run tasks' functions directly outside of a flow if you want
# but it won't be tracked by Prefect, it's just a normal function
assert add.fn(1, 2) == 3
### Useful Links:
1. Prefect Schedules
2. Prefect Code Example
Feel free to use these references, and if you need any further clarification, let me know!Janghoo Lee
06/14/2024, 7:47 AMMarvin
06/14/2024, 7:47 AM.yaml
files for scheduling, you can programmatically schedule your flows using Prefect's Python API.
### Programmatic Scheduling with Python
Here’s how you can create a schedule programmatically using the CronSchedule
class:
python
from prefect import flow, task
from prefect.deployments import Deployment
from prefect.schedules import CronSchedule
@task
def add(x: int, y: int) -> int:
return x + y
@flow
def my_flow():
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
# Create a deployment with a cron schedule
deployment = Deployment.build_from_flow(
flow=my_flow,
name="cron-scheduled-flow",
schedule=CronSchedule(cron="*/5 * * * *"), # runs every 5 minutes
)
if __name__ == "__main__":
deployment.apply()
# Optionally: Trigger the flow to run immediately for testing
my_flow()
### Explanation
- Flow Definition: Define your flow and tasks as usual.
- CronSchedule: Use CronSchedule
to create a schedule that runs every 5 minutes using the cron expression */5 * * * *
.
- Deployment: Use Deployment.build_from_flow
to link the schedule with your flow.
- Apply Deployment: Apply the deployment which starts the flow scheduler.
### Useful Links
- Prefect Schedules Documentation
- Prefect CronSchedule Documentation
Feel free to use these references to tweak your setup further. Let me know if you need more assistance!Nate
06/14/2024, 3:51 PMMarvin
06/14/2024, 3:51 PMserve
method to create a scheduled flow using a cron expression without relying on a .yaml
file. Below is an example demonstrating how to set this up:
### Example Flow with serve
and Cron Schedule
python
from prefect import flow, task
@task
def add(x: int, y: int) -> int:
return x + y
@flow
def my_flow():
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# Use serve to create a cron schedule for the flow
my_flow.serve(name="cron-scheduled-flow", cron="*/5 * * * *")
### Explanation
- Flow and Task Definition: Define your tasks and flow as usual.
- Cron Schedule: Use the serve
method with the cron
parameter to specify a cron expression (*/5 * * * *
for running every 5 minutes).
### Applying the Deployment
Run the script to apply the deployment and start the scheduler as follows:
sh
python script_name.py
This will schedule your flow to run every 5 minutes according to the cron expression provided.
### Useful Links
- Prefect Schedules Documentation
- Prefect `serve` Method Example (Documentation Reference)
Feel free to adapt the example to suit your needs. Let me know if you have more questions or need further assistance!Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by