Ash
02/16/2024, 9:11 AMfrom prefect import task
@task
def signal_task(message):
if message == 'stop_immediately!':
raise RuntimeError(message='Got a signal to end the task run!')
Sean Davis
02/16/2024, 12:50 PMAsh
02/16/2024, 2:05 PMSean Davis
02/16/2024, 2:52 PM@task
def get_data(): #probably provide parameters here....
logger = get_run_logger()
results = ... # do your data fetching logic
if len(results)==0:
logger.info('no results found')
return None
logger.info('{len(results)} found')
return results
@flow
def main_flow():
data_results = get_data()
if data_results is None:
logger.info('Just repeating that no results found, but no worries, this is expected')
elif:
logger.info('Again, just repeating that {len(results)} found')
Ash
02/16/2024, 2:55 PMAsh
02/16/2024, 2:58 PMmaximum_question_id = extract_maximum_question_id(questions_table_name, connection_string)
if maximum_question_id == "No data in table.":
slack_message(server + "." + schema + ": Table appears to be empty, no max ID found.")
return Failed(message="Table appears to be empty, no max ID found.")
else:
questions_data = extract_questions(url, maximum_question_id)
if questions_data[0] == "Can't reach URL":
slack_message(server + "." + schema + ": Can't reach URL - status code " + str(questions_data[1]))
return Failed(message="Can't reach URL - status code " + str(questions_data[1]))
elif questions_data == "No data in response":
Warning = State(type=StateType.CANCELLED, name="SKIPPED", message="JSON response is empty.")
return Warning
else:
extracted_data = structure_questions(questions_data)
questionnaire_dataframe = transform_questions(extracted_data)
final = load_questions(connection_string, questions_table_name, questionnaire_dataframe)
if final:
if final[0] == "Issue when loading data":
slack_message(server + "." + schema + ": Issue when loading data - " + str(final[1]))
return Failed(message="Issue when loading data - " + str(final[1]))
Sean Davis
02/16/2024, 3:03 PMAsh
02/16/2024, 3:04 PM