Gunnar Oehmichen
07/29/2024, 9:39 AMsqlalchemy.exc.IntegrityError
. The imported function looks as following:
def insert_bla(...): # some parameters
# some function logic
with SessionDB() as session:
with session.begin():
try:
session.execute(sql_insert)
session.commit()
print(f"{TableMainSupabase.__table__.name}: Inserted new {len(df_open_mastr_main_insert)} rows into supabase.")
except Exception as e:
session.rollback()
raise e
In the task this of course would then look like:
from utils import insert_bla
@task
... # some task-logic, preprocessing
def update_insert_table_bla(...):
# some preprocessing
insert_bla(df_bla,
...). # some connection parameters
So what I guess, I'd need to to, is again put the call of insert_bla() into a try except to catch the specific error and then, how to point to another task?Marvin
07/29/2024, 9:40 AMpython
from sqlalchemy.exc import IntegrityError
from prefect import flow, task
from utils import insert_bla
@task
def update_insert_table_bla(df_bla, *args, **kwargs):
try:
insert_bla(df_bla, *args, **kwargs)
except IntegrityError:
handle_integrity_error_task(df_bla) # Trigger another task on exception
except Exception as e:
raise e
@task
def handle_integrity_error_task(df_bla):
# Define logic to handle IntegrityError
print(f"Handling IntegrityError for data: {df_bla}")
@flow
def main_flow():
df_bla = ... # Your dataframe or data
update_insert_table_bla(df_bla, ...)
if __name__ == "__main__":
main_flow()
2. Explanation:
- update_insert_table_bla
Task: This task attempts to execute the insert_bla
function. If it catches an IntegrityError
, it triggers the handle_integrity_error_task
.
- handle_integrity_error_task
Task: This task defines the logic to handle the IntegrityError
exception.
- main_flow
Flow: This is your main flow that orchestrates the execution of tasks.
For more information on handling state and exceptions in Prefect, see these links:
- State Handling and Exceptions: Prefect State Handling
- Task Retry Conditions: Custom Retry Handler
If you need further customization or examples, let me know.Gunnar Oehmichen
07/29/2024, 9:58 AMMarvin
07/29/2024, 9:59 AMpython
from sqlalchemy.exc import IntegrityError
from prefect import flow, task
from prefect.states import Failed, State
@task
def insert_bla_task(df_bla, *args, **kwargs):
try:
insert_bla(df_bla, *args, **kwargs)
except IntegrityError as e:
raise e from None # To ensure the state captures the exception
except Exception as e:
raise e from None
@task
def handle_integrity_error_task(df_bla):
print(f"Handling IntegrityError for data: {df_bla}")
@task
def conditional_task(state: State, df_bla):
if isinstance(state.result(), IntegrityError):
handle_integrity_error_task(df_bla)
else:
return state.result()
@flow
def main_flow():
df_bla = ... # Your dataframe or data
state = insert_bla_task(df_bla, ..., return_state=True)
conditional_task(state, df_bla)
if __name__ == "__main__":
main_flow()
### Explanation:
1. insert_bla_task
Task: Attempts to execute and raises exceptions to influence state.
2. handle_integrity_error_task
Task: Handles the IntegrityError
separately.
3. conditional_task
Task: Takes the state output and triggers the error-handling task if an IntegrityError
is encountered.
4. main_flow
Flow: Manages the task execution and the conditional logic using states.
Documentation References:
- Working with States: Prefect States
- Handling Task Failures: Handling Task Failures
Using states effectively leverages Prefect’s full potential, streamlining error handling and making your workflows more robust and maintainable.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by