https://prefect.io logo
Title
h

Hedgar

03/04/2022, 6:59 PM
My flow failed and when I checked the error is the exception I inserted to check for completeness of data being extracted. I Actually did it because in the past without this check incomplete data got extracted. Could it be the way I raised the ValueError exception? Is there's a way in prefect to do exception that would suit my case? and secondly is there a way I can link this exception to a retry mechanism? That is if this occurred the flow could be seen as failed and therefore retry
k

Kevin Kho

03/04/2022, 7:31 PM
I think raising the error is fine but you can also use
raise FAILED()
to fail the task. We don’t have retries on the flow level but we do have for task level if you want that
h

Hedgar

03/05/2022, 2:03 PM
Maybe if I elaborate it would be clearer. The following is my code structure:
@task(name=‘extract symbol’)
def extract_symbol(): Something here… @task(name=“extract index”) def extract_index(): Something here … @task(name=“combine”) def combine(sym indx ): coby = pd.merge(sym,inx) if coby[0][-1] == “MUKY”: coby else: raise ValueError(“there has been an incomplete data extract”) with Flow(): syb = extract_symbol() inx = extract_index() combine(syb, inx) what strategy can I use to ensure that the whole code run again during retry when it failed?
k

Kevin Kho

03/05/2022, 2:09 PM
I know what you are saying. If something fails, how do you trigger the upstream as well right? It’s pretty hard in current Prefect because you probably need to do something with the GraphQL API to mark the previous task as Failed as well so that it also retries. This is a first class feature in Prefect 2.0 though with first-class subflows
So your best approach at the moment is using a State Handler to hit the graphQL API and get the task id. and then you do another one with the
set_task_run_state
mutation in order to mark it as Failed
h

Hedgar

03/05/2022, 7:35 PM
I decided to use prefect signals
from prefect.engine import signals
if coby[0][-1] != “MURKY”:
        raise signals.RETRY()
else:
      coby
according to the docs this would force a retry?
k

Kevin Kho

03/05/2022, 8:55 PM
Oh I thought you wanted to try a previous task as well. Yes this will force a retry but if the task has a retries configured and it fails, that will retry as well
h

Hedgar

03/06/2022, 8:13 AM
Yeah I thought so but when I look at the code closely I realize the task wherein I need the check is the FIRST task, so there's no preceding task to it.