Matheus
07/10/2025, 12:59 PMMarvin
07/10/2025, 1:00 PMMarvin
07/10/2025, 1:00 PMreturn_state=True
with mapped tasks and the wait_for_tasks()
utility.
Here's how you can modify your code to wait for all mapped tasks to complete (regardless of their state) before proceeding with the subflow:
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks
# Your existing mapped task execution
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True # Add this parameter
)
# Wait for all mapped tasks to complete, regardless of state
wait_for_tasks(hur_reports, return_when="all_completed")
# Now proceed with your subflow
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
# Remove the wait_for parameter since we're using wait_for_tasks
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
However, since you mentioned Prefect 2.x, I should note that if you're starting a new project, you should consider upgrading to Prefect 3.x. In Prefect 3.x, the pattern is slightly different and more straightforward:
python
from prefect import task, flow
@flow
def your_flow():
# Your mapped task execution
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
)
# In Prefect 3.x, mapped tasks automatically return a sequence of results
# The flow will wait for all mapped tasks to complete before proceeding
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
In both versions, this approach will:
1. Execute all mapped tasks
2. Wait for all of them to complete (regardless of success or failure)
3. Proceed with the bigquery_load subflow
4. Handle any errors that occur in the bigquery_load step separately
This gives you the behavior you want where failures in the mapped tasks won't prevent the subflow from running, while still ensuring all mapped tasks complete before moving forward.Matheus
07/10/2025, 1:05 PMMarvin
07/10/2025, 1:05 PMMarvin
07/10/2025, 1:05 PMwait_for_tasks()
independently for each mapped task group. Here's how you can modify your code to handle both mapped tasks independently in Prefect 2.x:
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
# Wait for only hur_reports to complete
wait_for_tasks(hur_reports, return_when="all_completed")
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Wait for only vmh_reports to complete
wait_for_tasks(vmh_reports, return_when="all_completed")
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
This approach:
1. Launches both mapped tasks (hur_reports
and vmh_reports
) concurrently
2. Waits for all hur_reports
tasks to complete before running its corresponding bigquery_load
3. Then waits for all vmh_reports
tasks to complete before running its corresponding bigquery_load
The vmh_reports
mapped tasks will continue executing in parallel while waiting for hur_reports
to complete. This maintains the independence of the two processes while ensuring each bigquery_load
operation waits for its respective mapped tasks to finish.
Again, if you're able to upgrade to Prefect 3.x, the pattern would be simpler as the dependency management is handled automatically when you reference the mapped task results.Matheus
07/10/2025, 1:08 PMMarvin
07/10/2025, 1:08 PMMarvin
07/10/2025, 1:08 PMwait_for_tasks()
with return_when="all_completed"
, the bigquery subflow will still run even if some of the mapped tasks fail. "all_completed" means it waits for all tasks to finish executing, regardless of whether they succeeded or failed.
Here's a more explicit way to handle this in Prefect 2.x, which also lets you log which tasks failed:
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks
from prefect.states import State
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
# Wait for hur_reports and check results
hur_states = wait_for_tasks(hur_reports, return_when="all_completed")
failed_hurs = [i for i, state in enumerate(hur_states) if state.is_failed()]
if failed_hurs:
logger.warning(f"HUR reports failed for indices: {failed_hurs}")
# Proceed with bigquery_load regardless of failures
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Same pattern for vmh_reports
vmh_states = wait_for_tasks(vmh_reports, return_when="all_completed")
failed_vmhs = [i for i, state in enumerate(vmh_states) if state.is_failed()]
if failed_vmhs:
logger.warning(f"VMH reports failed for indices: {failed_vmhs}")
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
This code will:
1. Wait for all mapped tasks to complete (success or failure)
2. Log which specific tasks failed (if any)
3. Proceed with the bigquery_load subflow regardless of any failures
4. Handle any errors in the bigquery_load itself separately
This gives you visibility into which tasks failed while still allowing the workflow to continue as desired.Matheus
07/10/2025, 1:12 PMMarvin
07/10/2025, 1:12 PMMarvin
07/10/2025, 1:12 PMprefect.tasks.tasks
. Here's the corrected code:
python
from prefect import task, flow
from prefect.tasks.tasks import wait_for_tasks
from prefect.states import State
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
# Wait for hur_reports and check results
hur_states = wait_for_tasks(hur_reports, return_when="all_completed")
failed_hurs = [i for i, state in enumerate(hur_states) if state.is_failed()]
if failed_hurs:
logger.warning(f"HUR reports failed for indices: {failed_hurs}")
# Proceed with bigquery_load regardless of failures
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Same pattern for vmh_reports
vmh_states = wait_for_tasks(vmh_reports, return_when="all_completed")
failed_vmhs = [i for i, state in enumerate(vmh_states) if state.is_failed()]
if failed_vmhs:
logger.warning(f"VMH reports failed for indices: {failed_vmhs}")
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
The functionality remains exactly the same, just with the correct import path for wait_for_tasks
. This will still allow you to wait for all mapped tasks to complete (regardless of their success or failure state) before proceeding with the bigquery subflows.
Let me know if you run into any other issues!Matheus
07/10/2025, 1:15 PMMarvin
07/10/2025, 1:15 PMMarvin
07/10/2025, 1:16 PMpython
from prefect import task, flow
from prefect.states import State
# Your mapped tasks with return_state=True
hur_reports = extract_hur_reports.map(
name=unmapped("hur_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
vmh_reports = extract_vmh_reports.map(
name=unmapped("vmh_reports"),
credentials=unmapped(usage_meter_credentials),
db_file=unmapped(db_file),
region=region,
data=agent_reports,
return_state=True
)
# Check states of hur_reports
failed_hurs = [i for i, state in enumerate(hur_reports) if state.is_failed()]
if failed_hurs:
logger.warning(f"HUR reports failed for indices: {failed_hurs}")
# Proceed with bigquery_load
try:
bigquery_load(
name="hur_reports",
table_name="hur_reports",
field_schema=class_map["hur_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["hur_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading HUR reports: {e}")
# Check states of vmh_reports
failed_vmhs = [i for i, state in enumerate(vmh_reports) if state.is_failed()]
if failed_vmhs:
logger.warning(f"VMH reports failed for indices: {failed_vmhs}")
try:
bigquery_load(
name="vmh_reports",
table_name="vmh_reports",
field_schema=class_map["vmh_reports"].avro_schema_to_python(),
table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
gcp_bucket=gcp_bucket,
sa_project_id=sa_project_id,
bq_project_id=bq_project_id,
env=env,
is_merge=True,
db_file=db_file,
)
except Exception as e:
logger.error(f"Error loading VMH reports: {e}")
The key points in this approach:
1. Set return_state=True
on your mapped tasks
2. The mapped tasks will return their states directly
3. You can check the states to see which tasks failed
4. The flow will naturally wait for all mapped tasks to complete before moving on
5. The bigquery_load tasks will run regardless of any failures in the mapped tasks
This achieves your goal of waiting for all mapped tasks to complete (even if some fail) before running the subsequent bigquery_load tasks.