Tomek Florek
01/26/2022, 9:58 AMAnna Geller
all_finished
trigger to run tasks regardless of whether they finish successfully or notTomek Florek
01/26/2022, 10:47 AMAnna Geller
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()
Tomek Florek
01/26/2022, 11:00 AM"SLACK_WEBHOOK_URL"
configured in secrets?Anna Geller
Tomek Florek
01/26/2022, 11:10 AMTomek Florek
01/26/2022, 11:27 AM# Send notifications only when GE fails
handler = slack_notifier(only_states=[Failed])
check_data = RunGreatExpectationsValidation(state_handlers=[handler], checkpoint_name="user_service_customer_keys")
load_to_s3 = S3Upload(
name='Upload',
bucket=f'plugsurfing-{ENVIRONMENT}-data-warehouse',
trigger=always_run
)
# Every 6 hours
schedule = Schedule(clocks=[IntervalClock(timedelta(hours=6))])
with Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
data_frame = extract_from_rdb(key=SSM_KEY, query=query)
clean_data_frame = fix_column_types(data_frame)
save_to_parquet(
data_frame=clean_data_frame,
local_path=OUTPUT_DIR + LANDING_FILENAME
).set_downstream(
check_data
).set_downstream(
load_to_s3(
data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
)
)
Anna Geller
Tomek Florek
01/26/2022, 11:40 AMAnna Geller
upstream_tasks
keyword e.g.
load_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
upstream_tasks=[check_data])
Anna Geller
save_locally = save_to_parquet(
data_frame=clean_data_frame,
local_path=OUTPUT_DIR + LANDING_FILENAME
)
this way you can reference “save_locally” in the upstream_tasks
Tomek Florek
01/26/2022, 11:42 AMTomek Florek
01/26/2022, 11:44 AMload_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
upstream_tasks=[save_locally, check_data])
Tomek Florek
01/26/2022, 11:49 AMwith Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
data_frame = extract_from_rdb(key=SSM_KEY, query=query)
clean_data_frame = fix_column_types(data_frame)
save_locally = save_to_parquet(
data_frame=clean_data_frame,
local_path=OUTPUT_DIR + LANDING_FILENAME
).set_downstream(
check_data.set_downstream(
load_to_s3(
data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
)
)
)
Anna Geller
flow.visualize()
to help see if dependencies are set as intendedTomek Florek
01/26/2022, 12:36 PMAnna Geller
flow.visualie()
- this will resolve all ambiguitiesAnna Geller
Tomek Florek
01/26/2022, 2:00 PMpost_to_slack_on_failure
but I’m missing a URL to the task there, do you know how one might add it?Anna Geller
team_slug = "anna-prefect" # replace by yours
task_run_id = prefect.context.get("task_run_id")
url = f"<https://cloud.prefect.io/{team_slug}/task-run/{task_run_id}?overview>"
Tomek Florek
01/26/2022, 2:28 PM