Hey all :slightly_smiling_face: I’m setting up Pre...
# ask-community
t
Hey all 🙂 I’m setting up Prefect with GE and can’t find the right way to implement the logic I have in mind. If the GE Validation fails (some data is wrong) I would like to be notified via Slack (Prefect Automation) as sorts of a warning, but the flow should carry on. What would be the correct way to implement that? 🧐
a
Triggers allow your flow to carry on even when some tasks such as the GE validation fails. You can e.g. set the
all_finished
trigger to run tasks regardless of whether they finish successfully or not
t
Thank you Anna! I have tried setting that up and in this scenario cannot get the Slack notification to fire up (since the flow is anyways successful). The problem is altering the state of the flow (to trigger the Slack) based on the failure of one task (GE). Maybe I’m looking in the wrong direction and should just use https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions. instead for that specific task
a
You’re 100% correct, Automations would send you alert on flow run failure, not for one specific task. For that, you can use a state handler - here is an example:
Copy code
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast


def post_to_slack_on_failure(task, old_state, new_state):
    if new_state.is_failed():
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {prefect.context.flow_run_id} "
            f"with an exception {value}"
        )
        SlackTask(message=msg).run()
    return new_state


@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
    return 1 / (b - a)


with Flow(name="state-inspection-handler") as flow:
    result = divide_numbers(1, 1)


if __name__ == "__main__":
    flow.run()
t
That’s great! Would it by default post to the
"SLACK_WEBHOOK_URL"
configured in secrets?
a
Correct
🙏 1
t
Thank you Anna 🙂
👍 1
If I could get your opinion, I have a lowkey question. What’s the clean way to give order to unrelated tasks (not depending on output from the previous task) ? What would you change here ?
Copy code
# Send notifications only when GE fails
handler = slack_notifier(only_states=[Failed])
check_data = RunGreatExpectationsValidation(state_handlers=[handler], checkpoint_name="user_service_customer_keys")
load_to_s3 = S3Upload(
    name='Upload',
    bucket=f'plugsurfing-{ENVIRONMENT}-data-warehouse',
    trigger=always_run
)
# Every 6 hours
schedule = Schedule(clocks=[IntervalClock(timedelta(hours=6))])

with Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
    data_frame = extract_from_rdb(key=SSM_KEY, query=query)
    clean_data_frame = fix_column_types(data_frame)
    save_to_parquet(
        data_frame=clean_data_frame,
        local_path=OUTPUT_DIR + LANDING_FILENAME
    ).set_downstream(
        check_data
    ).set_downstream(
        load_to_s3(
            data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
            key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
        )
    )
a
so the intended order is: extract_from_rdb -> fix_column_types -> save_to_parquet -> load_to_s3 + check_data in parallel?
t
ah, no. actually it’s finishing with save_to_parquet -> check_data -> load_to_s3
a
normally, you can define state dependencies using the
upstream_tasks
keyword e.g.
Copy code
load_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
            key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
            upstream_tasks=[check_data])
it’s easier than the set_downstream method Also, it’s easier if you assign a name to a task e.g.
Copy code
save_locally = save_to_parquet(
        data_frame=clean_data_frame,
        local_path=OUTPUT_DIR + LANDING_FILENAME
    )
this way you can reference “save_locally” in the
upstream_tasks
👍 1
t
That’s how I’ve tried it, but then connecting / chaining the check_data to the previous tasks seemed tricky
So I could set a few upstream tasks, as here?
Copy code
load_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
            key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
            upstream_tasks=[save_locally, check_data])
👍 1
Here what I intended to send in the first place, which looks a bit messy
Copy code
with Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
    data_frame = extract_from_rdb(key=SSM_KEY, query=query)
    clean_data_frame = fix_column_types(data_frame)
    save_locally = save_to_parquet(
        data_frame=clean_data_frame,
        local_path=OUTPUT_DIR + LANDING_FILENAME
    ).set_downstream(
        check_data.set_downstream(
            load_to_s3(
                data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
                key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
            )
        )
    )
a
The upstream_tasks is a list so you can provide multiple tasks to it. Can you share the full flow definition if you still need help here? You can use
flow.visualize()
to help see if dependencies are set as intended
t
Since it’s a list, will the tasks in it be executed in the given order? So I could pass _[extract_data, transform_data, save_data] ?_
a
No, this will only ensure that those tasks must be finished before that task to which you attach this list of dependencies. I think it would really help if you do
flow.visualie()
- this will resolve all ambiguities
alternatively, send me (you can DM) a full flow definition and tell me in which order you want to tasks to run (you can even draw 😄 ) and I can check what is defined incorrectly
t
Thank you Anna! I think I’ve got it figured out thanks to upstream tasks. I also went ahead setting up
post_to_slack_on_failure
but I’m missing a URL to the task there, do you know how one might add it?
a
yes:
Copy code
team_slug = "anna-prefect" # replace by yours
        task_run_id = prefect.context.get("task_run_id")
        url = f"<https://cloud.prefect.io/{team_slug}/task-run/{task_run_id}?overview>"
t
Works like a charm, thank you very much🙂
👍 1