dammy arinde

01/24/2022, 3:13 PM
Hi all! We have a staging table in Snowflake where we want to update the data validation status of files. We are using Great_expectations to handle the validations and we see the results under the artifact in the UI. If the validation fails, the flow fails and the log shows failed in the UI. 1) Is there a way to get the status of a flow (FAILED or SUCCESS) in a prefect task so we can update the Snowflake table from the task? 2) Is there a way to get the great_expectations result in a prefect task? Thanks!


01/24/2022, 3:23 PM
Hey dammy! Are you using the Great Expectations task from the Prefect task library?

Kevin Kho

01/24/2022, 3:24 PM
Hey @dammy arinde, I see what you are saying. Just a couple of ideas here. You might have to modify the GreatExpectations task in order to do that. You can’t get the status of a Flow from a Task because the Flow is still ongoing if there are Tasks running by definition. It seems like you can pull the results from the GreatExpectations task. If it is successful, it returns the result. If it fails, it can be accessed by doing
on the
exception. So if you are using a state handler, you can do:
Copy code
def state_handler(obj: Union[Task, Flow], old_state: State, new_state: State) -> Optional[State]:
    res = new_state.result
and then attach this to your task. And then you then put the logic to update Snowflake here with the result. If you are using a Task, you could use a downstream task to handle it:
Copy code
def snowflake_task(ge_output):
    if isinstance(ge_output, FAIL):
        res = ge_output.result
If you are using a downstream task, you need to set

dammy arinde

01/24/2022, 4:03 PM
@alex yes we are using the great expectations task from Prefect, we were hoping to be able to get the complete results from GE and update snowflake tables with the failed records.
@Kevin Kho Thanks Kevin, I will try the downstream task option.