<@ULVA73B9P> In Prefect 2, i have this task that r...
# ask-marvin
m
@Marvin In Prefect 2, i have this task that runs using .map. hur_reports = extract_hur_reports.map( name=unmapped("hur_reports"), credentials=unmapped(usage_meter_credentials), db_file=unmapped(db_file), region=region, data=agent_reports, ) As result, this task generate a lot of different runs and I want to wait all of them runs before starting the next step on my workflow (that is a subflow run): try: bigquery_load( name="hur_reports", table_name="hur_reports", field_schema=class_map["hur_reports"].avro_schema_to_python(), table_config=TABLE_CONFIG["hur_reports"].model_dump(), gcp_bucket=gcp_bucket, sa_project_id=sa_project_id, bq_project_id=bq_project_id, env=env, is_merge=True, db_file=db_file, wait_for=[hur_reports], ) except Exception as e: logger.error(f"Error loading HUR reports: {e}") I need to wait all the hur_reports task runs but, even one of them fail, I still want to run my subflow
m
thought for 16.0 seconds
Let me help you adapt this pattern for handling mapped tasks and their dependencies. In Prefect 2.x, the behavior you're describing can be achieved using
return_state=True
with mapped tasks and the
wait_for_tasks()
utility. Here's how you can modify your code to wait for all mapped tasks to complete (regardless of their state) before proceeding with the subflow:
Copy code
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks

# Your existing mapped task execution
hur_reports = extract_hur_reports.map(
    name=unmapped("hur_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True  # Add this parameter
)

# Wait for all mapped tasks to complete, regardless of state
wait_for_tasks(hur_reports, return_when="all_completed")

# Now proceed with your subflow
try:
    bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
        # Remove the wait_for parameter since we're using wait_for_tasks
    )
except Exception as e:
    logger.error(f"Error loading HUR reports: {e}")
However, since you mentioned Prefect 2.x, I should note that if you're starting a new project, you should consider upgrading to Prefect 3.x. In Prefect 3.x, the pattern is slightly different and more straightforward:
Copy code
python
from prefect import task, flow

@flow
def your_flow():
    # Your mapped task execution
    hur_reports = extract_hur_reports.map(
        name=unmapped("hur_reports"),
        credentials=unmapped(usage_meter_credentials),
        db_file=unmapped(db_file),
        region=region,
        data=agent_reports,
    )
    
    # In Prefect 3.x, mapped tasks automatically return a sequence of results
    # The flow will wait for all mapped tasks to complete before proceeding
    try:
        bigquery_load(
            name="hur_reports",
            table_name="hur_reports",
            field_schema=class_map["hur_reports"].avro_schema_to_python(),
            table_config=TABLE_CONFIG["hur_reports"].model_dump(),
            gcp_bucket=gcp_bucket,
            sa_project_id=sa_project_id,
            bq_project_id=bq_project_id,
            env=env,
            is_merge=True,
            db_file=db_file,
        )
    except Exception as e:
        logger.error(f"Error loading HUR reports: {e}")
In both versions, this approach will: 1. Execute all mapped tasks 2. Wait for all of them to complete (regardless of success or failure) 3. Proceed with the bigquery_load subflow 4. Handle any errors that occur in the bigquery_load step separately This gives you the behavior you want where failures in the mapped tasks won't prevent the subflow from running, while still ensuring all mapped tasks complete before moving forward.
m
@Marvin my code has another stuff too: hur_reports = extract_hur_reports.map( name=unmapped("hur_reports"), credentials=unmapped(usage_meter_credentials), db_file=unmapped(db_file), region=region, data=agent_reports, ) vmh_reports = extract_vmh_reports.map( name=unmapped("vmh_reports"), credentials=unmapped(usage_meter_credentials), db_file=unmapped(db_file), region=region, data=agent_reports, ) try: bigquery_load( name="hur_reports", table_name="hur_reports", field_schema=class_map["hur_reports"].avro_schema_to_python(), table_config=TABLE_CONFIG["hur_reports"].model_dump(), gcp_bucket=gcp_bucket, sa_project_id=sa_project_id, bq_project_id=bq_project_id, env=env, is_merge=True, db_file=db_file, wait_for=[hur_reports], ) except Exception as e: logger.error(f"Error loading HUR reports: {e}") try: bigquery_load( name="vmh_reports", table_name="vmh_reports", field_schema=class_map["vmh_reports"].avro_schema_to_python(), table_config=TABLE_CONFIG["vmh_reports"].model_dump(), gcp_bucket=gcp_bucket, sa_project_id=sa_project_id, bq_project_id=bq_project_id, env=env, is_merge=True, db_file=db_file, wait_for=[vmh_reports], ) except Exception as e: logger.error(f"Error loading VMH reports: {e}") Can I use "wait_for_tasks()" without implicate on "vmh_reports" execution?
m
thought for 12.7 seconds
Yes, you can use
wait_for_tasks()
independently for each mapped task group. Here's how you can modify your code to handle both mapped tasks independently in Prefect 2.x:
Copy code
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks

hur_reports = extract_hur_reports.map(
    name=unmapped("hur_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

vmh_reports = extract_vmh_reports.map(
    name=unmapped("vmh_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

# Wait for only hur_reports to complete
wait_for_tasks(hur_reports, return_when="all_completed")

try:
    bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading HUR reports: {e}")

# Wait for only vmh_reports to complete
wait_for_tasks(vmh_reports, return_when="all_completed")

try:
    bigquery_load(
        name="vmh_reports",
        table_name="vmh_reports",
        field_schema=class_map["vmh_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading VMH reports: {e}")
This approach: 1. Launches both mapped tasks (
hur_reports
and
vmh_reports
) concurrently 2. Waits for all
hur_reports
tasks to complete before running its corresponding
bigquery_load
3. Then waits for all
vmh_reports
tasks to complete before running its corresponding
bigquery_load
The
vmh_reports
mapped tasks will continue executing in parallel while waiting for
hur_reports
to complete. This maintains the independence of the two processes while ensuring each
bigquery_load
operation waits for its respective mapped tasks to finish. Again, if you're able to upgrade to Prefect 3.x, the pattern would be simpler as the dependency management is handled automatically when you reference the mapped task results.
m
@Marvin but if some hur_reports task run fails? will bigquery subflow still run?
m
thought for 13.5 seconds
Yes, when using
wait_for_tasks()
with
return_when="all_completed"
, the bigquery subflow will still run even if some of the mapped tasks fail. "all_completed" means it waits for all tasks to finish executing, regardless of whether they succeeded or failed. Here's a more explicit way to handle this in Prefect 2.x, which also lets you log which tasks failed:
Copy code
python
from prefect import task, flow
from prefect.utilities.tasks import wait_for_tasks
from prefect.states import State

hur_reports = extract_hur_reports.map(
    name=unmapped("hur_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

vmh_reports = extract_vmh_reports.map(
    name=unmapped("vmh_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

# Wait for hur_reports and check results
hur_states = wait_for_tasks(hur_reports, return_when="all_completed")
failed_hurs = [i for i, state in enumerate(hur_states) if state.is_failed()]
if failed_hurs:
    logger.warning(f"HUR reports failed for indices: {failed_hurs}")

# Proceed with bigquery_load regardless of failures
try:
    bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading HUR reports: {e}")

# Same pattern for vmh_reports
vmh_states = wait_for_tasks(vmh_reports, return_when="all_completed")
failed_vmhs = [i for i, state in enumerate(vmh_states) if state.is_failed()]
if failed_vmhs:
    logger.warning(f"VMH reports failed for indices: {failed_vmhs}")

try:
    bigquery_load(
        name="vmh_reports",
        table_name="vmh_reports",
        field_schema=class_map["vmh_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading VMH reports: {e}")
This code will: 1. Wait for all mapped tasks to complete (success or failure) 2. Log which specific tasks failed (if any) 3. Proceed with the bigquery_load subflow regardless of any failures 4. Handle any errors in the bigquery_load itself separately This gives you visibility into which tasks failed while still allowing the workflow to continue as desired.
m
@Marvin Não foi possível resolver a importação "prefect.utilities.tasks".
m
thought for 12.5 seconds
Ah, I apologize for the confusion. You're right - in Prefect 2.x, the correct way to handle this is using
prefect.tasks.tasks
. Here's the corrected code:
Copy code
python
from prefect import task, flow
from prefect.tasks.tasks import wait_for_tasks
from prefect.states import State

hur_reports = extract_hur_reports.map(
    name=unmapped("hur_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

vmh_reports = extract_vmh_reports.map(
    name=unmapped("vmh_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

# Wait for hur_reports and check results
hur_states = wait_for_tasks(hur_reports, return_when="all_completed")
failed_hurs = [i for i, state in enumerate(hur_states) if state.is_failed()]
if failed_hurs:
    logger.warning(f"HUR reports failed for indices: {failed_hurs}")

# Proceed with bigquery_load regardless of failures
try:
    bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading HUR reports: {e}")

# Same pattern for vmh_reports
vmh_states = wait_for_tasks(vmh_reports, return_when="all_completed")
failed_vmhs = [i for i, state in enumerate(vmh_states) if state.is_failed()]
if failed_vmhs:
    logger.warning(f"VMH reports failed for indices: {failed_vmhs}")

try:
    bigquery_load(
        name="vmh_reports",
        table_name="vmh_reports",
        field_schema=class_map["vmh_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading VMH reports: {e}")
The functionality remains exactly the same, just with the correct import path for
wait_for_tasks
. This will still allow you to wait for all mapped tasks to complete (regardless of their success or failure state) before proceeding with the bigquery subflows. Let me know if you run into any other issues!
m
@Marvin actually it seems like "wait_for_tasks" doesn't exist on Prefect 2.
m
thought for 52.3 seconds
I apologize for my previous incorrect answers. In Prefect 2.x, the correct way to handle this scenario is to use the task states directly. Here's how you can modify your code:
Copy code
python
from prefect import task, flow
from prefect.states import State

# Your mapped tasks with return_state=True
hur_reports = extract_hur_reports.map(
    name=unmapped("hur_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

vmh_reports = extract_vmh_reports.map(
    name=unmapped("vmh_reports"),
    credentials=unmapped(usage_meter_credentials),
    db_file=unmapped(db_file),
    region=region,
    data=agent_reports,
    return_state=True
)

# Check states of hur_reports
failed_hurs = [i for i, state in enumerate(hur_reports) if state.is_failed()]
if failed_hurs:
    logger.warning(f"HUR reports failed for indices: {failed_hurs}")

# Proceed with bigquery_load
try:
    bigquery_load(
        name="hur_reports",
        table_name="hur_reports",
        field_schema=class_map["hur_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["hur_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading HUR reports: {e}")

# Check states of vmh_reports
failed_vmhs = [i for i, state in enumerate(vmh_reports) if state.is_failed()]
if failed_vmhs:
    logger.warning(f"VMH reports failed for indices: {failed_vmhs}")

try:
    bigquery_load(
        name="vmh_reports",
        table_name="vmh_reports",
        field_schema=class_map["vmh_reports"].avro_schema_to_python(),
        table_config=TABLE_CONFIG["vmh_reports"].model_dump(),
        gcp_bucket=gcp_bucket,
        sa_project_id=sa_project_id,
        bq_project_id=bq_project_id,
        env=env,
        is_merge=True,
        db_file=db_file,
    )
except Exception as e:
    logger.error(f"Error loading VMH reports: {e}")
The key points in this approach: 1. Set
return_state=True
on your mapped tasks 2. The mapped tasks will return their states directly 3. You can check the states to see which tasks failed 4. The flow will naturally wait for all mapped tasks to complete before moving on 5. The bigquery_load tasks will run regardless of any failures in the mapped tasks This achieves your goal of waiting for all mapped tasks to complete (even if some fail) before running the subsequent bigquery_load tasks.