Matheus
07/10/2025, 4:43 PMMarvin
07/10/2025, 4:43 PMMarvin
07/10/2025, 4:43 PMwait_for
parameter in the second bigquery_load
call is incorrectly waiting for hur_reports
instead of vmh_reports
2. The error handling could be improved to maintain flow state visibility
3. The subflow relationships could be made more explicit
Here's the improved version:
python
from prefect import flow, task
from typing import List
from prefect.tasks import task_input_hash
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
db_file = "usage_meter.duckdb"
# Extract data with map
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
# Create subflows for loading data
await load_hur_data(
hur_reports=hur_reports,
db_file=db_file,
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
)
await load_vmh_data(
vmh_reports=vmh_reports,
db_file=db_file,
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
)
@flow(name="Load HUR Data")
async def load_hur_data(
hur_reports: List,
db_file: str,
gcp_bucket: str,
sa_project_id: str,
bq_project_id: str,
env: str,
):
# Wait for all mapped tasks to complete and collect results
results = [task.result(raise_on_failure=False) for task in hur_reports]
return await bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=results, # Pass the collected results
)
@flow(name="Load VMH Data")
async def load_vmh_data(
vmh_reports: List,
db_file: str,
gcp_bucket: str,
sa_project_id: str,
bq_project_id: str,
env: str,
):
# Wait for all mapped tasks to complete and collect results
results = [task.result(raise_on_failure=False) for task in vmh_reports]
return await bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=results, # Pass the collected results
)
Key improvements in this version:
1. Created separate subflows for loading HUR and VMH data, which will show up as distinct entities in the Prefect UI with their own task groups
2. Each subflow explicitly waits for its respective mapped tasks to complete before running the bigquery_load
3. Fixed the dependency in the VMH load to wait for vmh_reports
instead of hur_reports
4. Removed the try/except blocks from the main flow and let Prefect handle the error states naturally, which provides better visibility in the UI
5. Made the code more maintainable by separating concerns into distinct subflows
In the Prefect UI, you'll now see:
- The main flow usage_meter_global_pipeline
- Two distinct subflows: "Load HUR Data" and "Load VMH Data"Marvin
07/10/2025, 4:43 PMMatheus
07/10/2025, 4:46 PMMarvin
07/10/2025, 4:46 PMMarvin
07/10/2025, 4:47 PMpython
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
db_file = "usage_meter.duckdb"
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
# Wait for all HUR reports to complete before proceeding
await gather(*[task for task in hur_reports])
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Wait for all VMH reports to complete before proceeding
await gather(*[task for task in vmh_reports])
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
Key changes:
1. Removed the wait_for
parameter from bigquery_load
calls
2. Added await gather(*[task for task in hur_reports])
and await gather(*[task for task in vmh_reports])
before their respective BigQuery loads
3. Fixed the VMH reports section to wait for vmh_reports
instead of hur_reports
This approach:
- Maintains your existing flow structure
- Ensures all mapped tasks complete before their respective BigQuery loads
- Keeps the task relationships visible in the Prefect UI
- Works correctly with Prefect 2.x
Make sure to import `gather`:
python
from asyncio import gather
The gather()
function will wait for all mapped tasks to complete before proceeding to the BigQuery load, while maintaining proper task relationships in the UI. This is a simpler and more direct approach than using wait_for
or creating additional flows.Matheus
07/10/2025, 4:48 PMMarvin
07/10/2025, 4:48 PMMarvin
07/10/2025, 4:48 PMwait_for
parameter is important for maintaining the visual task relationships in the Prefect UI. Let's modify your code to properly use wait_for
while ensuring all tasks complete:
python
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
db_file = "usage_meter.duckdb"
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
# Collect all HUR report tasks
hur_results = [task for task in hur_reports]
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=hur_results, # Pass the entire list of tasks
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Collect all VMH report tasks
vmh_results = [task for task in vmh_reports]
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=vmh_results, # Pass the entire list of tasks
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
Key changes:
1. Removed the .result()
calls from the wait_for
lists
2. Pass the entire list of task futures directly to wait_for
3. Fixed the VMH section to wait for vmh_results
instead of HUR reports
This approach will:
- Maintain proper task dependencies in the Prefect UI
- Ensure all mapped tasks complete before their respective BigQuery loads
- Keep the visual relationship between tasks and subflows in the UI
- Work correctly with Prefect 2.x
The wait_for
parameter will now properly show the dependencies in the UI while ensuring all upstream tasks complete before the BigQuery load tasks begin.Matheus
07/10/2025, 4:48 PMMarvin
07/10/2025, 4:49 PMMarvin
07/10/2025, 4:49 PMraise_on_failure=False
in the original code. Here's the correct version:
python
@flow(timeout_seconds=1800, log_prints=True)
async def usage_meter_global_pipeline():
db_file = "usage_meter.duckdb"
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
# Wait for all HUR tasks to complete, collecting results regardless of failures
hur_results = []
for task in hur_reports:
try:
await task
except Exception:
pass
hur_results.append(task)
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=hur_results, # Pass all tasks, including failed ones
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Wait for all VMH tasks to complete, collecting results regardless of failures
vmh_results = []
for task in vmh_reports:
try:
await task
except Exception:
pass
vmh_results.append(task)
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
wait_for=vmh_results, # Pass all tasks, including failed ones
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
This version:
1. Waits for all tasks to complete (success or failure) using await
2. Maintains the task relationships in the UI using wait_for
3. Allows the BigQuery load to proceed even if some upstream tasks failed
4. Preserves error handling at both the task and subflow level
The key difference is that we're:
- Explicitly awaiting each task but catching any exceptions
- Adding all tasks to the wait_for
list regardless of their success/failure state
- Maintaining the visual relationships in the UI while allowing the workflow to continue even with upstream failures
This should give you exactly what you want: the BigQuery loads will execute even if some of their upstream tasks fail, while still maintaining proper task relationships in the Prefect UI.