Gunnar Oehmichen
07/29/2024, 9:39 AMsqlalchemy.exc.IntegrityError
. The imported function looks as following:
def insert_bla(...): # some parameters
# some function logic
with SessionDB() as session:
with session.begin():
try:
session.execute(sql_insert)
session.commit()
print(f"{TableMainSupabase.__table__.name}: Inserted new {len(df_open_mastr_main_insert)} rows into supabase.")
except Exception as e:
session.rollback()
raise e
In the task this of course would then look like:
from utils import insert_bla
@task
... # some task-logic, preprocessing
def update_insert_table_bla(...):
# some preprocessing
insert_bla(df_bla,
...). # some connection parameters
So what I guess, I'd need to to, is again put the call of insert_bla() into a try except to catch the specific error and then, how to point to another task?Marvin
07/29/2024, 9:40 AMpython
from sqlalchemy.exc import IntegrityError
from prefect import flow, task
from utils import insert_bla
@task
def update_insert_table_bla(df_bla, *args, **kwargs):
try:
insert_bla(df_bla, *args, **kwargs)
except IntegrityError:
handle_integrity_error_task(df_bla) # Trigger another task on exception
except Exception as e:
raise e
@task
def handle_integrity_error_task(df_bla):
# Define logic to handle IntegrityError
print(f"Handling IntegrityError for data: {df_bla}")
@flow
def main_flow():
df_bla = ... # Your dataframe or data
update_insert_table_bla(df_bla, ...)
if __name__ == "__main__":
main_flow()
2. Explanation:
- update_insert_table_bla
Task: This task attempts to execute the insert_bla
function. If it catches an IntegrityError
, it triggers the handle_integrity_error_task
.
- handle_integrity_error_task
Task: This task defines the logic to handle the IntegrityError
exception.
- main_flow
Flow: This is your main flow that orchestrates the execution of tasks.
For more information on handling state and exceptions in Prefect, see these links:
- State Handling and Exceptions: Prefect State Handling
- Task Retry Conditions: Custom Retry Handler
If you need further customization or examples, let me know.Gunnar Oehmichen
07/29/2024, 9:58 AMMarvin
07/29/2024, 9:59 AMpython
from sqlalchemy.exc import IntegrityError
from prefect import flow, task
from prefect.states import Failed, State
@task
def insert_bla_task(df_bla, *args, **kwargs):
try:
insert_bla(df_bla, *args, **kwargs)
except IntegrityError as e:
raise e from None # To ensure the state captures the exception
except Exception as e:
raise e from None
@task
def handle_integrity_error_task(df_bla):
print(f"Handling IntegrityError for data: {df_bla}")
@task
def conditional_task(state: State, df_bla):
if isinstance(state.result(), IntegrityError):
handle_integrity_error_task(df_bla)
else:
return state.result()
@flow
def main_flow():
df_bla = ... # Your dataframe or data
state = insert_bla_task(df_bla, ..., return_state=True)
conditional_task(state, df_bla)
if __name__ == "__main__":
main_flow()
### Explanation:
1. insert_bla_task
Task: Attempts to execute and raises exceptions to influence state.
2. handle_integrity_error_task
Task: Handles the IntegrityError
separately.
3. conditional_task
Task: Takes the state output and triggers the error-handling task if an IntegrityError
is encountered.
4. main_flow
Flow: Manages the task execution and the conditional logic using states.
Documentation References:
- Working with States: Prefect States
- Handling Task Failures: Handling Task Failures
Using states effectively leverages Prefect’s full potential, streamlining error handling and making your workflows more robust and maintainable.