Matheus
03/07/2025, 12:30 PMgcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
Marvin
03/07/2025, 12:30 PMrun_truncate_table
runs even if the run_procedure
task fails, you can use the allow_failure
utility in Prefect 3.x. This function allows a task to continue execution irrespective of the failure of a preceding task. Here's how you can modify your flow:
python
from prefect import flow, task, allow_failure
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
@flow
def my_flow():
load_gcs = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[load_gcs],
)
run_proc = allow_failure(run_proc)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
my_flow()
This way, run_truncate_table
will execute regardless of whether run_procedure
succeeds or fails. You can find more about this utility and other task management features in the Prefect documentation.
Additionally, here's a link to the Prefect API overview for further exploration of the available functions and utilities.Matheus
03/07/2025, 12:31 PMMarvin
03/07/2025, 12:31 PMrun_truncate_table
executes even if run_procedure
fails, you can use the allow_failure
utility in Prefect 3.x. This utility allows a task to proceed regardless of the failure of a preceding task. Here's how you can adjust your flow:
python
from prefect import flow, task, allow_failure
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
@flow
def my_flow():
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
run_proc = allow_failure(run_proc)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
my_flow()
This ensures that run_truncate_table
will be executed irrespective of the outcome of run_procedure
. For further details, you can explore Prefect's task management documentation.Matheus
03/07/2025, 12:31 PMMarvin
03/07/2025, 12:32 PMwait_for
parameter. However, to allow a task to proceed even if the previous one fails, you can utilize the prefect.engine.signals.FAIL
signal to explicitly manage task execution flow.
Here's an approach to ensure run_truncate_table
runs even if run_procedure
fails:
python
from prefect import task, Flow
from prefect.engine.signals import FAIL
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
try:
# Code that might fail
pass
except Exception as e:
# Log the exception and allow the flow to continue
print(f"Procedure failed with error: {e}")
raise FAIL("Continuing with failure")
@task
def run_truncate_table(...):
# Task implementation
pass
with Flow("my_flow") as flow:
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
flow.run()
In this setup, run_procedure
will raise a FAIL
signal if it encounters an error, allowing the flow to continue executing run_truncate_table
. You can find more about task dependency management in Prefect 2.x here.Matheus
03/07/2025, 12:58 PMMarvin
03/07/2025, 12:59 PMFAIL
signal in Prefect 2.x. The correct approach to ensure run_truncate_table
runs even if run_procedure
fails involves using the allow_failure
utility, which is available in Prefect's top-level API.
Here's how you can modify your flow:
python
from prefect import flow, task, allow_failure
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
@flow
def my_flow():
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
run_proc = allow_failure(run_proc)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
my_flow()
This setup uses allow_failure
to ensure that run_truncate_table
will execute regardless of whether run_procedure
succeeds or fails. You can find more about this utility and other task management features in the Prefect documentation.Matheus
03/07/2025, 2:31 PMMarvin
03/07/2025, 2:32 PMresult(raise_on_failure=False)
method on task futures. This method allows you to proceed with downstream tasks even if the upstream task fails by not raising an exception for the failure. Here's how you can modify your flow:
python
from prefect import task, Flow
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
with Flow("my_flow") as flow:
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
).result(raise_on_failure=False)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc],
)
flow.run()
This approach ensures that run_truncate_table
will execute regardless of whether run_procedure
succeeds or fails. You can find more about this pattern in the Prefect 2.x documentation.Matheus
03/07/2025, 2:39 PMMarvin
03/07/2025, 2:40 PMraise_on_failure
parameter is used when you retrieve the result of a task, not directly when defining the task dependencies. To ensure that run_truncate_table
runs even if run_procedure
fails, you should use the result()
method on the task future with raise_on_failure=False
. Here's how you can properly implement this:
python
from prefect import task, Flow
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
with Flow("my_flow") as flow:
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc_future = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
# Retrieve the result without raising on failure
run_proc_result = run_proc_future.result(raise_on_failure=False)
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc_result],
)
flow.run()
This setup ensures that run_truncate_table
will execute regardless of whether run_procedure
succeeds or fails. The key is to use result(raise_on_failure=False)
when you want to continue execution despite a failure.Matheus
03/07/2025, 2:43 PMMarvin
03/07/2025, 2:44 PMrun_truncate_table
runs even if run_procedure
fails, you can use a combination of task state checks and conditional logic within the flow. Here's how you can do it:
python
from prefect import task, Flow
from prefect.engine.state import Failed
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
with Flow("my_flow") as flow:
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
# Define a task to check the state of run_proc
@task
def check_and_run_truncate(state):
if isinstance(state, Failed):
print("run_procedure failed, but continuing with run_truncate_table.")
run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table
)
# Use the state of run_proc to decide execution
check_and_run_truncate(run_proc)
flow.run()
In this setup, check_and_run_truncate
checks the state of run_proc
. If it fails, it prints a message and continues with run_truncate_table
. This ensures that the truncation task executes regardless of the success or failure of the procedure task.Matheus
03/07/2025, 2:55 PMMarvin
03/07/2025, 2:55 PMallow_failure
in Prefect 3.x. However, you can achieve the desired behavior by using the raise_on_failure=False
parameter when retrieving task results, which prevents the flow from halting due to task failures.
Here's how you can implement it:
python
from prefect import task, Flow
@task
def gcs_to_bigquery(...):
# Task implementation
pass
@task
def run_procedure(...):
# Task implementation
pass
@task
def run_truncate_table(...):
# Task implementation
pass
with Flow("my_flow") as flow:
gcs_to_bq = gcs_to_bigquery(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
write_disposition="WRITE_EMPTY",
gcs_uri=gcs_path,
wait_for=[load_gcs],
)
run_proc_future = run_procedure(
name=table_name,
project_id=project_id,
procedure_dataset=procedure_dataset,
procedure_name=merge_raw,
source_dataset=source_dataset,
source_table=source_table,
target_dataset=target_dataset,
target_table=target_table,
primary_key=primary_key,
all_fields=all_fields,
wait_for=[gcs_to_bq],
)
# Use the task future directly in wait_for to ensure continuation
truncate_task = run_truncate_table(
name=table_name,
project_id=project_id,
dataset_id=source_dataset,
table_id=source_table,
wait_for=[run_proc_future], # Continue regardless of success or failure
)
flow.run()
By using the task future directly in wait_for
, you allow run_truncate_table
to execute regardless of whether run_procedure
fails or succeeds. This approach ensures that the truncation task runs after the procedure task, without being interrupted by its failure.