Zivile
04/29/2024, 8:39 PMMarvin
04/29/2024, 8:39 PM@task
decorator and ensure they are called within the @flow
decorated function. Direct scheduling of tasks independent of a flow is not supported; they must be part of a flow to be scheduled and executed by Prefect.
For scheduling flows (which comprise tasks), you can use external schedulers or create deployments on Prefect Cloud or a local Prefect server that include a schedule. Tasks are used for encapsulating discrete units of work and are always executed as part of a flow run.
Here is a basic example of defining and calling tasks within a flow:
python
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
"""just a python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""tasks must be called in the flow to be executed"""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# run the flow
my_flow()
# you can run tasks' functions directly outside of a flow if you want
# but it won't be tracked by prefect, it's just a normal function
assert add.fn(1, 2) == 3
For detailed information, please refer to the relevant sections of the Prefect documentation:
- Tasks in a flow
- Flow runs
- Tasks
You can also examine the example code on GitHub.Zivile
04/29/2024, 8:44 PMMarvin
04/29/2024, 8:45 PMpython
from prefect import flow
from datetime import datetime, timedelta
import asyncio
@flow
def child_flow():
# Subflow logic here
pass
@flow
def parent_flow():
# Check the current time and desired subflow time
now = datetime.now()
subflow_time = datetime(2023, 4, 20, 12, 0) # 12:00 PM on April 20, 2023
# Calculate the time difference
time_to_wait = subflow_time - now
if time_to_wait.total_seconds() > 0:
# Wait until the scheduled time
asyncio.sleep(time_to_wait.total_seconds())
# Run the subflow
child_flow()
if __name__ == "__main__":
parent_flow()
This example assumes that the parent flow is running and will execute the subflow when the scheduled time is reached. For a more robust solution, you might need to integrate Prefect's Automation concepts, which allow you to react to certain events, but in this case, you would be creating a custom event based on time within your own parent flow's logic.
For more advanced scheduling capabilities, I recommend reviewing the Automations section of the Prefect documentation and considering the setup of an external scheduler or a deployment that invokes the subflow at the desired time.Nate
04/29/2024, 8:45 PMrun_deployment
!Marvin
04/29/2024, 8:46 PMrun_deployment
function provided by Prefect. The run_deployment
function can schedule the run of an existing deployment when your Python code executes.
Here's an example snippet of how you might use `run_deployment`:
python
from prefect.deployments import run_deployment
@flow
def parent_flow():
# Logic to determine if it's the right time to run the subflow
# ...
# Run a deployment of the subflow
run_deployment(name="my_subflow_deployment_name")
Remember to replace "my_subflow_deployment_name"
with the actual name of the deployment for your subflow. This function allows you to programmatically control flow execution from within Python, meaning that you can potentially put it within a time-checking mechanism in your parent flow's logic to wait for a specific schedule.
For detailed instructions on creating a deployment, you can refer to the Block-based Deployments documentation. This section outlines the configuration, application, and listing of deployments using the Prefect CLI.
Furthermore, the concept of Automations and schedulers available in Prefect also provides event-based rules and time-based schedules for running a deployment.
Once you have your deployment configured, using run_deployment
in your parent flow should allow you to trigger a subflow precisely when your logic conditions (such as reaching a specific time) are met.