raises a signal.FAIL. How do I deal with that gracefully and branch (via case or something else)?
z
Zanie
01/13/2021, 9:18 PM
Ah I see. It may be reasonable to add a config option to that takes that returns the result in failure cases as well as successes. However, you should be able to just use a state handler.
m
Marc Lipoff
01/13/2021, 9:18 PM
ok awesome
Marc Lipoff
01/13/2021, 9:19 PM
how do i make the "database insert" task dependant on the GE validation?
z
Zanie
01/13/2021, 9:21 PM
I’m looking into an example
Zanie
01/13/2021, 9:28 PM
Copy code
import prefect
PRETEND_VALIDATION_PASSED = True
def handle_failed_validation(task, old_state, new_state):
if isinstance(new_state, prefect.engine.state.ValidationFailed):
return prefect.engine.state.Success(result=new_state.result)
return new_state
@prefect.task(state_handlers=[handle_failed_validation])
def fail_task():
class FakeResult:
success: bool = PRETEND_VALIDATION_PASSED
if PRETEND_VALIDATION_PASSED:
return FakeResult()
raise prefect.engine.signals.VALIDATIONFAIL(result=FakeResult())
@prefect.task()
def check_ge_result(result):
return result.success
@prefect.task(log_stdout=True)
def log(message):
print(message)
with prefect.Flow("test") as flow:
result = fail_task()
validation_passed = check_ge_result(result)
with prefect.case(validation_passed, True):
log("Handle validation success using result")
with prefect.case(validation_passed, False):
log("Validation failed, log it or whatever")
flow.run()
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.