Luis Condados
09/24/2025, 9:29 PMfrom dotenv import load_dotenv
from prefect import flow
from tasks_and_subflows import (
subflow_add_embeddings,
subflow_create_cvat_tasks,
subflow_download_annotations_from_cvat_tasks,
subflow_git_dvc_fetch_pull_flow,
subflow_make_tmp_51dataset,
subflow_model_evaluation_primary_detector,
subflow_primary_detector_inference,
subflow_update_primary_detector_dataset,
subflow_yolo_nas_training,
)
from mlops.utils import ENV_FILEPATH, get_logger
load_dotenv(ENV_FILEPATH)
@flow(name="workflow-primary-detector")
def primary_detector_workflow():
logger = get_logger()
<http://logger.info|logger.info>("Starting primary detector workflow...")
# Step 1: Sync with Git and DVC
<http://logger.info|logger.info>("Fetching latest code and data from Git and DVC...")
subflow_git_dvc_fetch_pull_flow()
# Step 2: Prepare Temporary Dataset
<http://logger.info|logger.info>("Preparing temporary dataset...")
subflow_make_tmp_51dataset()
# Step 3: Add embeddings
<http://logger.info|logger.info>("Adding embeddings to dataset...")
subflow_add_embeddings()
# Step 4: Add model inference results to dataset
<http://logger.info|logger.info>("Running primary detector inference...")
subflow_primary_detector_inference()
# Step 5: Create CVAT tasks for human annotation
<http://logger.info|logger.info>("Creating CVAT tasks for annotation...")
subflow_create_cvat_tasks()
# Step 6: Download annotations from CVAT tasks
subflow_download_annotations_from_cvat_tasks() # it will keep trying until all tasks are completed # noqa: E501
# Step 7: Update primary detector dataset with new annotations
<http://logger.info|logger.info>("Updating primary detector dataset with new annotations...")
subflow_update_primary_detector_dataset()
# Step 8: Model Training
<http://logger.info|logger.info>("Training primary detector model...")
subflow_yolo_nas_training()
# Step 9: Model Evaluation
<http://logger.info|logger.info>("Evaluating primary detector model...")
subflow_model_evaluation_primary_detector()
if __name__ == "__main__":
primary_detector_workflow()
Marvin
09/24/2025, 9:29 PMMarvin
09/24/2025, 9:32 PMfrom prefect import flow, get_run_logger
from datetime import datetime, timedelta
class AnnotationsNotReady(RuntimeError):
pass
def annotations_ready() -> bool:
# TODO: implement your CVAT readiness check
# return True when all tasks are complete
return False
def download_annotations():
# TODO: implement the actual download
pass
@flow(
name="download-annotations-from-cvat",
retries=1000, # pick a sensible upper bound
retry_delay_seconds=2 * 60 * 60, # 2 hours
retry_jitter_factor=0.1, # optional: spread retries a bit
)
def subflow_download_annotations_from_cvat_tasks():
logger = get_run_logger()
if not annotations_ready():
<http://logger.info|logger.info>("Annotations not ready yet; will retry in ~2 hours.")
# Raising any exception triggers a retry when retries > 0
raise AnnotationsNotReady("CVAT annotations are not ready yet")
<http://logger.info|logger.info>("Annotations ready — downloading...")
download_annotations()
<http://logger.info|logger.info>("Download complete.")
Then your main flow stays the same, just call the subflow. It will keep retrying every 2h until successful, and only then proceed to step 7.
Option B — Keep the subflow as-is and set options at call time
If you don’t want to modify the subflow decorator, set retries when you call it:
# Step 6: Download annotations from CVAT tasks (poll every 2h)
subflow_download_annotations_from_cvat_tasks.with_options(
retries=1000,
retry_delay_seconds=2 * 60 * 60,
retry_jitter_factor=0.1, # optional
)()
Important notes
- Don’t sleep for hours inside a task/subflow (e.g., time.sleep(7200)): that ties up a worker slot. Prefer raising an exception to let Prefect schedule the next attempt.
- Your flow is already sequential as written (each call waits for the previous to finish). Just make sure your subflow raises when annotations aren’t ready.
- Consider a safety cap: set a maximum wait (e.g., via retries count or a timestamp check). You can also add timeout_seconds
on the subflow if you want a hard stop.
- Slack formatted your logger calls as links — in real code make sure these are just <http://logger.info|logger.info>("...")
.
Assuming Prefect 3.x:
- Flows and tasks both support retries
, retry_delay_seconds
, and retry_jitter_factor
in their decorators or via .with_options(...)
.
- Raising any exception triggers the retry when configured.
Refs:
- API reference for tasks and flows (retries/delays): Prefect 3 API reference
- Notes on retries/delays in Prefect 3: Prefect 3.2 release notes
If you share how you check CVAT task completeness, I can help wire that into the subflow so it raises only when appropriate and returns once everything’s ready.