<@ULVA73B9P> I'm trying to implement a long flow m...
# ask-marvin
l
@Marvin I'm trying to implement a long flow made of subflows and tasks. I'd like it to run sequentially and specially on the step 6 (download the annotations from the annotator tool), in that step the flow will try get the annotations, but it might not be done yet, so I want it keep checking at every 2h and only proceed to the next step when it succefully get the annotations.
Copy code
from dotenv import load_dotenv
from prefect import flow
from tasks_and_subflows import (
    subflow_add_embeddings,
    subflow_create_cvat_tasks,
    subflow_download_annotations_from_cvat_tasks,
    subflow_git_dvc_fetch_pull_flow,
    subflow_make_tmp_51dataset,
    subflow_model_evaluation_primary_detector,
    subflow_primary_detector_inference,
    subflow_update_primary_detector_dataset,
    subflow_yolo_nas_training,
)

from mlops.utils import ENV_FILEPATH, get_logger

load_dotenv(ENV_FILEPATH)

@flow(name="workflow-primary-detector")
def primary_detector_workflow():
    logger = get_logger()
    <http://logger.info|logger.info>("Starting primary detector workflow...")

    # Step 1: Sync with Git and DVC
    <http://logger.info|logger.info>("Fetching latest code and data from Git and DVC...")
    subflow_git_dvc_fetch_pull_flow()

    # Step 2: Prepare Temporary Dataset
    <http://logger.info|logger.info>("Preparing temporary dataset...")
    subflow_make_tmp_51dataset()

    # Step 3: Add embeddings
    <http://logger.info|logger.info>("Adding embeddings to dataset...")
    subflow_add_embeddings()

    # Step 4: Add model inference results to dataset
    <http://logger.info|logger.info>("Running primary detector inference...")
    subflow_primary_detector_inference()

    # Step 5: Create CVAT tasks for human annotation
    <http://logger.info|logger.info>("Creating CVAT tasks for annotation...")
    subflow_create_cvat_tasks()

    # Step 6: Download annotations from CVAT tasks
    subflow_download_annotations_from_cvat_tasks() # it will keep trying until all tasks are completed  # noqa: E501

    # Step 7: Update primary detector dataset with new annotations
    <http://logger.info|logger.info>("Updating primary detector dataset with new annotations...")
    subflow_update_primary_detector_dataset()

    # Step 8: Model Training
    <http://logger.info|logger.info>("Training primary detector model...")
    subflow_yolo_nas_training()

    # Step 9: Model Evaluation
    <http://logger.info|logger.info>("Evaluating primary detector model...")
    subflow_model_evaluation_primary_detector()

if __name__ == "__main__":
    primary_detector_workflow()
m
thought for 177.3 seconds
Great question — you can do this cleanly in Prefect 3.x by giving your “download annotations” step retries with a 2h retry delay and having it raise when annotations aren’t ready yet. Prefect will release the worker and schedule the retry instead of blocking for hours, and your parent flow will wait for that subflow to succeed before moving on. Two ways to do it: Option A — Put retries on the subflow itself Inside tasks_and_subflows.py, give the subflow retries and raise when not ready:
Copy code
from prefect import flow, get_run_logger
from datetime import datetime, timedelta

class AnnotationsNotReady(RuntimeError):
    pass

def annotations_ready() -> bool:
    # TODO: implement your CVAT readiness check
    # return True when all tasks are complete
    return False

def download_annotations():
    # TODO: implement the actual download
    pass

@flow(
    name="download-annotations-from-cvat",
    retries=1000,                    # pick a sensible upper bound
    retry_delay_seconds=2 * 60 * 60, # 2 hours
    retry_jitter_factor=0.1,         # optional: spread retries a bit
)
def subflow_download_annotations_from_cvat_tasks():
    logger = get_run_logger()

    if not annotations_ready():
        <http://logger.info|logger.info>("Annotations not ready yet; will retry in ~2 hours.")
        # Raising any exception triggers a retry when retries > 0
        raise AnnotationsNotReady("CVAT annotations are not ready yet")

    <http://logger.info|logger.info>("Annotations ready — downloading...")
    download_annotations()
    <http://logger.info|logger.info>("Download complete.")
Then your main flow stays the same, just call the subflow. It will keep retrying every 2h until successful, and only then proceed to step 7. Option B — Keep the subflow as-is and set options at call time If you don’t want to modify the subflow decorator, set retries when you call it:
Copy code
# Step 6: Download annotations from CVAT tasks (poll every 2h)
subflow_download_annotations_from_cvat_tasks.with_options(
    retries=1000,
    retry_delay_seconds=2 * 60 * 60,
    retry_jitter_factor=0.1,  # optional
)()
Important notes - Don’t sleep for hours inside a task/subflow (e.g., time.sleep(7200)): that ties up a worker slot. Prefer raising an exception to let Prefect schedule the next attempt. - Your flow is already sequential as written (each call waits for the previous to finish). Just make sure your subflow raises when annotations aren’t ready. - Consider a safety cap: set a maximum wait (e.g., via retries count or a timestamp check). You can also add
timeout_seconds
on the subflow if you want a hard stop. - Slack formatted your logger calls as links — in real code make sure these are just
<http://logger.info|logger.info>("...")
. Assuming Prefect 3.x: - Flows and tasks both support
retries
,
retry_delay_seconds
, and
retry_jitter_factor
in their decorators or via
.with_options(...)
. - Raising any exception triggers the retry when configured. Refs: - API reference for tasks and flows (retries/delays): Prefect 3 API reference - Notes on retries/delays in Prefect 3: Prefect 3.2 release notes If you share how you check CVAT task completeness, I can help wire that into the subflow so it raises only when appropriate and returns once everything’s ready.
🙌 1