Joseph Loss

    Joseph Loss

    1 year ago
    does anyone have an example of using "on_failure" in a flow argument to re-run the whole flow after a certain period? I see the examples for specific tasks, would it work the same way for a flow I'm guessing?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Joseph Loss, which part of the docs are you looking at?
    What kind of period are you thinking for the flow re-run?
    Joseph Loss

    Joseph Loss

    1 year ago
    @Kevin Kho trying to piece together two parts of the docs:https://docs.prefect.io/core/concepts/states.html#state-types combined with the on_failure callable function within the flow args:https://docs.prefect.io/api/latest/core/flow.html
    Just a few hours, maybe 3 max. We have this price loader than runs at 1am, some days it is successful and others we'll get a failure if the max_date downloaded <= max_date of the database (we want to make sure we have yesterday's close prices at the start of day)
    Upon re-running a few hours later, that failed flow will run successfully. So I'm trying to find a way to automate, otherwise I'll be getting up at 7am to re-run any failures if they occur
    Kevin Kho

    Kevin Kho

    1 year ago
    gotcha, I think what you need to do is make a function with that signature for on failure. If it fails, hit the graphQL API and create a new flow run at the schedule you want.
    use the Prefect
    Client
    like this and use the
    create_flow_run
    mutation.
    You can use
    str(pendulum.now("US/Pacific").add(hours=3))
    for the start time with that mutation
    Joseph Loss

    Joseph Loss

    1 year ago
    perfect dude I'm gonna take a crack at this tonight!
    This is exactly what I needed, thank you so much
    @Kevin Kho is G.O.A.T. For anyone interested, here's how to do it:
    def alert_on_special_failure(task, old_state, new_state):
        logger = prefect.context.get('logger')
    
        if new_state.is_failed():
            if getattr(new_state.result, "flag", False) is True:
                <http://logger.info|logger.info>("This is a test of the new data validation notification!")
                <http://logger.info|logger.info>("Max date: {}".format(new_state.result.value))
    
                msg = "Task {} failed. Max date = {}".format(task.name, new_state.result.value)
                <http://requests.post|requests.post>(slack_hook, json={"text": msg})
    
                # login to client and reschedule flow
                client = prefect.Client(api_token = "API_KEY")
                client.login_to_tenant(tenant_slug = 'TENANT_SLUG', tenant_id = 'TENANT_ID')
                tiingaFlowID = 'original_flow_ID'
    
    
                restartTime = pendulum.now('America/Chicago').add(hours = 3)
                retryID = client.create_flow_run(flow_id = tiingaFlowID,
                                                 scheduled_start_time = restartTime,
                                                 run_name = 'RETRY-TIINGA-DATA-LOADER',
                                                 parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 2).strftime('%Y-%m-%d'))
                })
                # get new scheduled flow run info
                flow_info = client.get_flow_run_info(retryID)
                new_flow_name = flow_info.name
                new_flow_state = flow_info.state
                new_flow_scheduled_time = flow_info.scheduled_start_time.strftime('%Y-%d-%m %H:%M:%S %p')
    
                # notify slack of reschedule
                msg = "\n\nFlow: {}\nState: {}\nStart: {}".format(new_flow_name, new_flow_state, new_flow_scheduled_time)
                <http://requests.post|requests.post>(slack_hook, json={"text": msg})
        return new_state
    
    
    # --------------------------------------------------------------------------------------------------
    # validate data is current and not stale
    
    @task(state_handlers = [alert_on_special_failure])
    def fnValidateLastDate(df):
    
        logger = prefect.context.get('logger')
    
        q = """SELECT max(date) as date from defaultdb.tblsecuritypricestiinga"""
        conn = fnOdbcConnect('defaultDSN')
        dfCheck = pd.read_sql_query(q, conn)
    
        dfCheck['date'] = pd.to_datetime(dfCheck['date']).dt.strftime('%Y-%m-%d')
    
        if df['date'].max() <= dfCheck['date'].max():
            fail_signal = signals.FAIL("Tiinga data is stale!! Max date: {}".format(df['date'].max()))
            fail_signal.flag = True
            fail_signal.value = df['date'].max()
            raise fail_signal
    
        else:
            df.fillna(0,inplace=True)
            return df
    Kevin Kho

    Kevin Kho

    1 year ago
    Thank you for the praise. 😆 Glad you figured it out. Could you move the code into a thread since it’s quite a big block? And I guess it isn’t a task? I’ll think about where we can place it
    Joseph Loss

    Joseph Loss

    1 year ago
    yeah I realized it was way too massive after I posted it lol my fault
    maybe in the API docs as an EX, under the graphQL section you sent over
    Kevin Kho

    Kevin Kho

    1 year ago
    Nicely done!
    Joseph Loss

    Joseph Loss

    1 year ago
    @Kevin Kho I'm super close but there's one issue, when I register the flow, that changes the flowID to re-run. So right now, I'm ttrying the VersionGroupID, but I'm pretty sure that is going to change when I register this flow as well.
    tiingaVersionGroupID = '8afd1d50-37fa-427e-93b5-7be1422cdb8d'
    
    restartTime = pendulum.now('America/Chicago').add(hours = 2)
    
    retryID = client.create_flow_run(version_group_id = tiingaVersionGroupID,
                                     scheduled_start_time = restartTime,
                                     run_name = 'RETRY-TIINGA-DATA-LOADER',
                                     parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 10).strftime('%Y-%m-%d'))
                                                    })
    am I supposed to use an idempotency key to circumvent this?
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh what? Does it create a new flow or bump up a version?
    Joseph Loss

    Joseph Loss

    1 year ago
    bump a version, since flows like this are going to be pretty permanent tasks that we run daily
    Kevin Kho

    Kevin Kho

    1 year ago
    I see, Will look into it
    Joseph Loss

    Joseph Loss

    1 year ago
    Thank you sir!
    @Kevin Kho hold on, I might have beat you to it
    from prefect.tasks.prefect import StartFlowRun
    f=StartFlowRun(flow_name='daily-tiinga-loader',project_name='sraPROD',run_name='TEST-RETRY')
    you can check this using f.serialize(), and then calling f.run() returns the flow ID of the newly scheduled flow
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh I know what you’re saying now
    You might need to do a GraphQL query by flow name and project and get the flow id from there to need into your create_flow_run
    Joseph Loss

    Joseph Loss

    1 year ago
    Using the prefect task start flow run, it is working. Is there something that I should be concerned about with using this? I.E. do I have to do a graphQL query if this is working?
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh I see, yeah that’s the better solution I think
    Joseph Loss

    Joseph Loss

    1 year ago
    thanks dude! Yeah this flow was rescheduled correctly, thankfully I can use the flow name with this method. Passing parameters, setting s scheduled start and giving it a custom name all are working as well - we're good to go