Marc Lipoff
01/13/2021, 8:22 PMRunGreatExpectationsValidation
tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.Zanie
case
https://docs.prefect.io/api/latest/tasks/control_flow.html#case ?Marc Lipoff
01/13/2021, 9:13 PMRunGreatExpectationsValidation
raises a signal.FAIL. How do I deal with that gracefully and branch (via case or something else)?Zanie
Marc Lipoff
01/13/2021, 9:18 PMZanie
import prefect
PRETEND_VALIDATION_PASSED = True
def handle_failed_validation(task, old_state, new_state):
if isinstance(new_state, prefect.engine.state.ValidationFailed):
return prefect.engine.state.Success(result=new_state.result)
return new_state
@prefect.task(state_handlers=[handle_failed_validation])
def fail_task():
class FakeResult:
success: bool = PRETEND_VALIDATION_PASSED
if PRETEND_VALIDATION_PASSED:
return FakeResult()
raise prefect.engine.signals.VALIDATIONFAIL(result=FakeResult())
@prefect.task()
def check_ge_result(result):
return result.success
@prefect.task(log_stdout=True)
def log(message):
print(message)
with prefect.Flow("test") as flow:
result = fail_task()
validation_passed = check_ge_result(result)
with prefect.case(validation_passed, True):
log("Handle validation success using result")
with prefect.case(validation_passed, False):
log("Validation failed, log it or whatever")
flow.run()
Marc Lipoff
01/13/2021, 9:30 PM