Joseph Loss
05/17/2021, 3:56 PMKevin Kho
Kevin Kho
Joseph Loss
05/17/2021, 6:24 PMJoseph Loss
05/17/2021, 6:25 PMJoseph Loss
05/17/2021, 6:26 PMKevin Kho
Kevin Kho
Kevin Kho
str(pendulum.now("US/Pacific").add(hours=3))
for the start time with that mutationJoseph Loss
05/18/2021, 1:31 AMJoseph Loss
05/18/2021, 1:31 AMJoseph Loss
05/18/2021, 2:14 AMdef alert_on_special_failure(task, old_state, new_state):
logger = prefect.context.get('logger')
if new_state.is_failed():
if getattr(new_state.result, "flag", False) is True:
<http://logger.info|logger.info>("This is a test of the new data validation notification!")
<http://logger.info|logger.info>("Max date: {}".format(new_state.result.value))
msg = "Task {} failed. Max date = {}".format(task.name, new_state.result.value)
<http://requests.post|requests.post>(slack_hook, json={"text": msg})
# login to client and reschedule flow
client = prefect.Client(api_token = "API_KEY")
client.login_to_tenant(tenant_slug = 'TENANT_SLUG', tenant_id = 'TENANT_ID')
tiingaFlowID = 'original_flow_ID'
restartTime = pendulum.now('America/Chicago').add(hours = 3)
retryID = client.create_flow_run(flow_id = tiingaFlowID,
scheduled_start_time = restartTime,
run_name = 'RETRY-TIINGA-DATA-LOADER',
parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 2).strftime('%Y-%m-%d'))
})
# get new scheduled flow run info
flow_info = client.get_flow_run_info(retryID)
new_flow_name = flow_info.name
new_flow_state = flow_info.state
new_flow_scheduled_time = flow_info.scheduled_start_time.strftime('%Y-%d-%m %H:%M:%S %p')
# notify slack of reschedule
msg = "\n\nFlow: {}\nState: {}\nStart: {}".format(new_flow_name, new_flow_state, new_flow_scheduled_time)
<http://requests.post|requests.post>(slack_hook, json={"text": msg})
return new_state
# --------------------------------------------------------------------------------------------------
# validate data is current and not stale
@task(state_handlers = [alert_on_special_failure])
def fnValidateLastDate(df):
logger = prefect.context.get('logger')
q = """SELECT max(date) as date from defaultdb.tblsecuritypricestiinga"""
conn = fnOdbcConnect('defaultDSN')
dfCheck = pd.read_sql_query(q, conn)
dfCheck['date'] = pd.to_datetime(dfCheck['date']).dt.strftime('%Y-%m-%d')
if df['date'].max() <= dfCheck['date'].max():
fail_signal = signals.FAIL("Tiinga data is stale!! Max date: {}".format(df['date'].max()))
fail_signal.flag = True
fail_signal.value = df['date'].max()
raise fail_signal
else:
df.fillna(0,inplace=True)
return df
Kevin Kho
Joseph Loss
05/18/2021, 2:19 AMJoseph Loss
05/18/2021, 2:20 AMKevin Kho
Joseph Loss
05/18/2021, 4:12 PMtiingaVersionGroupID = '8afd1d50-37fa-427e-93b5-7be1422cdb8d'
restartTime = pendulum.now('America/Chicago').add(hours = 2)
retryID = client.create_flow_run(version_group_id = tiingaVersionGroupID,
scheduled_start_time = restartTime,
run_name = 'RETRY-TIINGA-DATA-LOADER',
parameters = { "start_date":"{}".format(pendulum.now('America/Chicago').subtract(days = 10).strftime('%Y-%m-%d'))
})
Joseph Loss
05/18/2021, 4:12 PMKevin Kho
Joseph Loss
05/18/2021, 4:14 PMKevin Kho
Joseph Loss
05/18/2021, 4:17 PMJoseph Loss
05/18/2021, 4:23 PMJoseph Loss
05/18/2021, 4:25 PMfrom prefect.tasks.prefect import StartFlowRun
f=StartFlowRun(flow_name='daily-tiinga-loader',project_name='sraPROD',run_name='TEST-RETRY')
you can check this using f.serialize(), and then calling f.run() returns the flow ID of the newly scheduled flowKevin Kho
Kevin Kho
Joseph Loss
05/18/2021, 4:41 PMKevin Kho
Joseph Loss
05/18/2021, 4:43 PM