does anyone have an example of using "on_failure" ...
# ask-community
j
does anyone have an example of using "on_failure" in a flow argument to re-run the whole flow after a certain period? I see the examples for specific tasks, would it work the same way for a flow I'm guessing?
k
Hey @Joseph Loss, which part of the docs are you looking at?
What kind of period are you thinking for the flow re-run?
j
@Kevin Kho trying to piece together two parts of the docs: https://docs.prefect.io/core/concepts/states.html#state-types combined with the on_failure callable function within the flow args: https://docs.prefect.io/api/latest/core/flow.html
Just a few hours, maybe 3 max. We have this price loader than runs at 1am, some days it is successful and others we'll get a failure if the max_date downloaded <= max_date of the database (we want to make sure we have yesterday's close prices at the start of day)
Upon re-running a few hours later, that failed flow will run successfully. So I'm trying to find a way to automate, otherwise I'll be getting up at 7am to re-run any failures if they occur
k
gotcha, I think what you need to do is make a function with that signature for on failure. If it fails, hit the graphQL API and create a new flow run at the schedule you want.
use the Prefect
Client
like this and use the
create_flow_run
mutation.
You can use
str(pendulum.now("US/Pacific").add(hours=3))
for the start time with that mutation
j
perfect dude I'm gonna take a crack at this tonight!
This is exactly what I needed, thank you so much
👍 1
@Kevin Kho is G.O.A.T. For anyone interested, here's how to do it:
Copy code
def alert_on_special_failure(task, old_state, new_state):
    logger = prefect.context.get('logger')

    if new_state.is_failed():
        if getattr(new_state.result, "flag", False) is True:
            <http://logger.info|logger.info>("This is a test of the new data validation notification!")
            <http://logger.info|logger.info>("Max date: {}".format(new_state.result.value))

            msg = "Task {} failed. Max date = {}".format(task.name, new_state.result.value)
            <http://requests.post|requests.post>(slack_hook, json={"text": msg})

            # login to client and reschedule flow
            client = prefect.Client(api_token = "API_KEY")
            client.login_to_tenant(tenant_slug = 'TENANT_SLUG', tenant_id = 'TENANT_ID')
            tiingaFlowID = 'original_flow_ID'


            restartTime = pendulum.now('America/Chicago').add(hours = 3)
            retryID = client.create_flow_run(flow_id = tiingaFlowID,
                                             scheduled_start_time = restartTime,
                                             run_name = 'RETRY-TIINGA-DATA-LOADER',
                                             parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 2).strftime('%Y-%m-%d'))
            })
            # get new scheduled flow run info
            flow_info = client.get_flow_run_info(retryID)
            new_flow_name = flow_info.name
            new_flow_state = flow_info.state
            new_flow_scheduled_time = flow_info.scheduled_start_time.strftime('%Y-%d-%m %H:%M:%S %p')

            # notify slack of reschedule
            msg = "\n\nFlow: {}\nState: {}\nStart: {}".format(new_flow_name, new_flow_state, new_flow_scheduled_time)
            <http://requests.post|requests.post>(slack_hook, json={"text": msg})
    return new_state


# --------------------------------------------------------------------------------------------------
# validate data is current and not stale

@task(state_handlers = [alert_on_special_failure])
def fnValidateLastDate(df):

    logger = prefect.context.get('logger')

    q = """SELECT max(date) as date from defaultdb.tblsecuritypricestiinga"""
    conn = fnOdbcConnect('defaultDSN')
    dfCheck = pd.read_sql_query(q, conn)

    dfCheck['date'] = pd.to_datetime(dfCheck['date']).dt.strftime('%Y-%m-%d')

    if df['date'].max() <= dfCheck['date'].max():
        fail_signal = signals.FAIL("Tiinga data is stale!! Max date: {}".format(df['date'].max()))
        fail_signal.flag = True
        fail_signal.value = df['date'].max()
        raise fail_signal

    else:
        df.fillna(0,inplace=True)
        return df
k
Thank you for the praise. 😆 Glad you figured it out. Could you move the code into a thread since it’s quite a big block? And I guess it isn’t a task? I’ll think about where we can place it
j
yeah I realized it was way too massive after I posted it lol my fault
maybe in the API docs as an EX, under the graphQL section you sent over
👍 1
k
Nicely done!
j
@Kevin Kho I'm super close but there's one issue, when I register the flow, that changes the flowID to re-run. So right now, I'm ttrying the VersionGroupID, but I'm pretty sure that is going to change when I register this flow as well.
Copy code
tiingaVersionGroupID = '8afd1d50-37fa-427e-93b5-7be1422cdb8d'

restartTime = pendulum.now('America/Chicago').add(hours = 2)

retryID = client.create_flow_run(version_group_id = tiingaVersionGroupID,
                                 scheduled_start_time = restartTime,
                                 run_name = 'RETRY-TIINGA-DATA-LOADER',
                                 parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 10).strftime('%Y-%m-%d'))
                                                })
am I supposed to use an idempotency key to circumvent this?
k
Oh what? Does it create a new flow or bump up a version?
j
bump a version, since flows like this are going to be pretty permanent tasks that we run daily
k
I see, Will look into it
j
Thank you sir!
@Kevin Kho hold on, I might have beat you to it
Copy code
from prefect.tasks.prefect import StartFlowRun
f=StartFlowRun(flow_name='daily-tiinga-loader',project_name='sraPROD',run_name='TEST-RETRY')
you can check this using f.serialize(), and then calling f.run() returns the flow ID of the newly scheduled flow
k
Oh I know what you’re saying now
You might need to do a GraphQL query by flow name and project and get the flow id from there to need into your create_flow_run
j
Using the prefect task start flow run, it is working. Is there something that I should be concerned about with using this? I.E. do I have to do a graphQL query if this is working?
k
Oh I see, yeah that’s the better solution I think
j
thanks dude! Yeah this flow was rescheduled correctly, thankfully I can use the flow name with this method. Passing parameters, setting s scheduled start and giving it a custom name all are working as well - we're good to go
👍 1