Ignacio Herrera
07/29/2024, 4:41 PMMarvin
07/29/2024, 4:41 PMpython
from prefect import task, Flow
from prefect.tasks.control_flow import merge_task
@task
def check_execution_window():
import datetime
now = datetime.datetime.utcnow()
# Assuming tasks should only run between 9 AM - 5 PM UTC
if now.hour < 9 or now.hour > 17:
raise ValueError("Task execution window has passed")
with Flow("conditional_task_execution") as flow:
execution_check = check_execution_window()
# downstream tasks here that depend on execution_check
Example for Custom Conditional Logic within Task:
python
from prefect import task, Flow
@task
def my_task():
import datetime
now = datetime.datetime.utcnow()
scheduled_time = datetime.datetime(2023, 10, 1, 10) # Example scheduled time
if (now - scheduled_time).total_seconds() > 3600: # 1 hour late
raise ValueError("Task is too late to run")
with Flow("custom_logic_task") as flow:
my_task_run = my_task()
Neither of these methods fully automates marking late tasks as failed, but they allow you to control whether tasks run based on time conditions.
For more advanced uses or other ideas, you can check out the Prefect documentation on tasks and concurrency handling here.
If you require a more automated feature, you might want to raise a feature request on GitHub: Prefect GitHub Issues.