<@ULVA73B9P> in the below case, how is the best wa...
# ask-marvin
m
@Marvin in the below case, how is the best way to make the bigquery_load subflow wait all its related previous tasks runs before starting to run? Is also important that on the Prefect UI, the tasks be related to its respective subflow runs. @flow(timeout_seconds=1800, log_prints=True) async def usage_meter_global_pipeline(): db_file = "usage_meter.duckdb" hur_reports = extract_hur_reports.map( name=unmapped("hur_reports"), credentials=unmapped(usage_meter_credentials), db_file=unmapped(db_file), region=region, data=agent_reports, ) vmh_reports = extract_vmh_reports.map( name=unmapped("vmh_reports"), credentials=unmapped(usage_meter_credentials), db_file=unmapped(db_file), region=region, data=agent_reports, ) try: bigquery_load( name="hur_reports", table_name="hur_reports", field_schema=class_map["hur_reports"].avro_schema_to_python(), table_config=TABLE_CONFIG["hur_reports"].model_dump(), gcp_bucket=gcp_bucket, sa_project_id=sa_project_id, bq_project_id=bq_project_id, env=env, is_merge=True, db_file=db_file, wait_for=[task.result(raise_on_failure=False) for task in hur_reports], ) except Exception as e: logger.error(f"Error loading HUR reports: {e}") try: bigquery_load( name="vmh_reports", table_name="vmh_reports", field_schema=class_map["vmh_reports"].avro_schema_to_python(), table_config=TABLE_CONFIG["vmh_reports"].model_dump(), gcp_bucket=gcp_bucket, sa_project_id=sa_project_id, bq_project_id=bq_project_id, env=env, is_merge=True, db_file=db_file, wait_for=[task.result(raise_on_failure=False) for task in hur_reports], ) except Exception as e: logger.error(f"Error loading VMH reports: {e}")
m
thought for 19.1 seconds
Let me help you optimize the task dependencies and subflow visibility in your pipeline. There are a few issues to address in your current implementation: 1. The
wait_for
parameter in the second
bigquery_load
call is incorrectly waiting for
hur_reports
instead of
vmh_reports
2. The error handling could be improved to maintain flow state visibility 3. The subflow relationships could be made more explicit Here's the improved version:
Copy code
python
from prefect import flow, task
from typing import List
from prefect.tasks import task_input_hash

@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
    db_file = "usage_meter.duckdb"

    # Extract data with map
    hur_reports = extract_hur_reports.map(
        name=unmapped("hur_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    vmh_reports = extract_vmh_reports.map(
        name=unmapped("vmh_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    # Create subflows for loading data
    await load_hur_data(
        hur_reports=hur_reports,
        db_file=db_file,
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
    )

    await load_vmh_data(
        vmh_reports=vmh_reports,
        db_file=db_file,
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
    )

@flow(name="Load HUR Data")
async def load_hur_data(
    hur_reports: List,
    db_file: str,
    gcp_bucket: str,
    sa_project_id: str,
    bq_project_id: str,
    env: str,
):
    # Wait for all mapped tasks to complete and collect results
    results = [task.result(raise_on_failure=False) for task in hur_reports]
    
    return await bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
        wait_for=results,  # Pass the collected results
    )

@flow(name="Load VMH Data")
async def load_vmh_data(
    vmh_reports: List,
    db_file: str,
    gcp_bucket: str,
    sa_project_id: str,
    bq_project_id: str,
    env: str,
):
    # Wait for all mapped tasks to complete and collect results
    results = [task.result(raise_on_failure=False) for task in vmh_reports]
    
    return await bigquery_load(
        name="vmh_reports",
        table_name="vmh_reports",
        field_schema=class_map["vmh_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
        wait_for=results,  # Pass the collected results
    )
Key improvements in this version: 1. Created separate subflows for loading HUR and VMH data, which will show up as distinct entities in the Prefect UI with their own task groups 2. Each subflow explicitly waits for its respective mapped tasks to complete before running the
bigquery_load
3. Fixed the dependency in the VMH load to wait for
vmh_reports
instead of
hur_reports
4. Removed the try/except blocks from the main flow and let Prefect handle the error states naturally, which provides better visibility in the UI 5. Made the code more maintainable by separating concerns into distinct subflows In the Prefect UI, you'll now see: - The main flow
usage_meter_global_pipeline
- Two distinct subflows: "Load HUR Data" and "Load VMH Data"
- Clear task dependencies within each subflow - The mapped tasks will be properly associated with their respective subflows This structure ensures that: 1. Each BigQuery load operation waits for all its upstream tasks to complete 2. The dependency structure is clearly visible in the UI 3. Failures are properly tracked and visible at the appropriate level in the flow hierarchy
m
@Marvin I dind't like your approach. I don't want to create a new flow or nothing. I want to use what I have. And is important to tell that I'm using Prefect 2.c
m
thought for 13.9 seconds
Ah, since you're using Prefect 2.x and want to keep your existing structure, let me provide a more targeted solution. The main issue is ensuring proper task dependencies while maintaining visibility in the UI. Here's how to modify your existing code:
Copy code
python
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
    db_file = "usage_meter.duckdb"

    hur_reports = extract_hur_reports.map(
        name=unmapped("hur_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    vmh_reports = extract_vmh_reports.map(
        name=unmapped("vmh_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    # Wait for all HUR reports to complete before proceeding
    await gather(*[task for task in hur_reports])

    try:
        bigquery_load(
            name="hur_reports",
            table_name="hur_reports",
            field_schema=class_map["hur_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["hur_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
        )
    except Exception as e:
        logger.error(f"Error loading HUR reports: {e}")

    # Wait for all VMH reports to complete before proceeding
    await gather(*[task for task in vmh_reports])

    try:
        bigquery_load(
            name="vmh_reports",
            table_name="vmh_reports",
            field_schema=class_map["vmh_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
        )
    except Exception as e:
        logger.error(f"Error loading VMH reports: {e}")
Key changes: 1. Removed the
wait_for
parameter from
bigquery_load
calls 2. Added
await gather(*[task for task in hur_reports])
and
await gather(*[task for task in vmh_reports])
before their respective BigQuery loads 3. Fixed the VMH reports section to wait for
vmh_reports
instead of
hur_reports
This approach: - Maintains your existing flow structure - Ensures all mapped tasks complete before their respective BigQuery loads - Keeps the task relationships visible in the Prefect UI - Works correctly with Prefect 2.x Make sure to import `gather`:
Copy code
python
from asyncio import gather
The
gather()
function will wait for all mapped tasks to complete before proceeding to the BigQuery load, while maintaining proper task relationships in the UI. This is a simpler and more direct approach than using
wait_for
or creating additional flows.
m
@Marvin the wait_for is important to make the task and subflow be related on Prefect UI.
m
thought for 11.6 seconds
Ah, you're absolutely right - the
wait_for
parameter is important for maintaining the visual task relationships in the Prefect UI. Let's modify your code to properly use
wait_for
while ensuring all tasks complete:
Copy code
python
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
    db_file = "usage_meter.duckdb"

    hur_reports = extract_hur_reports.map(
        name=unmapped("hur_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    vmh_reports = extract_vmh_reports.map(
        name=unmapped("vmh_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    # Collect all HUR report tasks
    hur_results = [task for task in hur_reports]
    
    try:
        bigquery_load(
            name="hur_reports",
            table_name="hur_reports",
            field_schema=class_map["hur_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["hur_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
            wait_for=hur_results,  # Pass the entire list of tasks
        )
    except Exception as e:
        logger.error(f"Error loading HUR reports: {e}")

    # Collect all VMH report tasks
    vmh_results = [task for task in vmh_reports]
    
    try:
        bigquery_load(
            name="vmh_reports",
            table_name="vmh_reports",
            field_schema=class_map["vmh_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
            wait_for=vmh_results,  # Pass the entire list of tasks
        )
    except Exception as e:
        logger.error(f"Error loading VMH reports: {e}")
Key changes: 1. Removed the
.result()
calls from the
wait_for
lists 2. Pass the entire list of task futures directly to
wait_for
3. Fixed the VMH section to wait for
vmh_results
instead of HUR reports This approach will: - Maintain proper task dependencies in the Prefect UI - Ensure all mapped tasks complete before their respective BigQuery loads - Keep the visual relationship between tasks and subflows in the UI - Work correctly with Prefect 2.x The
wait_for
parameter will now properly show the dependencies in the UI while ensuring all upstream tasks complete before the BigQuery load tasks begin.
m
@Marvin if i do as you said, when some hur_reports or vmh_reports task run fails, the subflow will not execute. And I want the opposite.
m
thought for 12.8 seconds
Ah, now I understand - you want the BigQuery load subflow to run even if some of the upstream tasks fail. That's why you had the
raise_on_failure=False
in the original code. Here's the correct version:
Copy code
python
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
    db_file = "usage_meter.duckdb"

    hur_reports = extract_hur_reports.map(
        name=unmapped("hur_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    vmh_reports = extract_vmh_reports.map(
        name=unmapped("vmh_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )

    # Wait for all HUR tasks to complete, collecting results regardless of failures
    hur_results = []
    for task in hur_reports:
        try:
            await task
        except Exception:
            pass
        hur_results.append(task)

    try:
        bigquery_load(
            name="hur_reports",
            table_name="hur_reports",
            field_schema=class_map["hur_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["hur_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
            wait_for=hur_results,  # Pass all tasks, including failed ones
        )
    except Exception as e:
        logger.error(f"Error loading HUR reports: {e}")

    # Wait for all VMH tasks to complete, collecting results regardless of failures
    vmh_results = []
    for task in vmh_reports:
        try:
            await task
        except Exception:
            pass
        vmh_results.append(task)

    try:
        bigquery_load(
            name="vmh_reports",
            table_name="vmh_reports",
            field_schema=class_map["vmh_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
            wait_for=vmh_results,  # Pass all tasks, including failed ones
        )
    except Exception as e:
        logger.error(f"Error loading VMH reports: {e}")
This version: 1. Waits for all tasks to complete (success or failure) using
await
2. Maintains the task relationships in the UI using
wait_for
3. Allows the BigQuery load to proceed even if some upstream tasks failed 4. Preserves error handling at both the task and subflow level The key difference is that we're: - Explicitly awaiting each task but catching any exceptions - Adding all tasks to the
wait_for
list regardless of their success/failure state - Maintaining the visual relationships in the UI while allowing the workflow to continue even with upstream failures This should give you exactly what you want: the BigQuery loads will execute even if some of their upstream tasks fail, while still maintaining proper task relationships in the Prefect UI.