Tomek Florek

    Tomek Florek

    7 months ago
    Hey all 🙂 I’m setting up Prefect with GE and can’t find the right way to implement the logic I have in mind. If the GE Validation fails (some data is wrong) I would like to be notified via Slack (Prefect Automation) as sorts of a warning, but the flow should carry on. What would be the correct way to implement that? 🧐
    Anna Geller

    Anna Geller

    7 months ago
    Triggers allow your flow to carry on even when some tasks such as the GE validation fails. You can e.g. set the
    all_finished
    trigger to run tasks regardless of whether they finish successfully or not
    Tomek Florek

    Tomek Florek

    7 months ago
    Thank you Anna! I have tried setting that up and in this scenario cannot get the Slack notification to fire up (since the flow is anyways successful). The problem is altering the state of the flow (to trigger the Slack) based on the failure of one task (GE). Maybe I’m looking in the wrong direction and should just use https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions. instead for that specific task
    Anna Geller

    Anna Geller

    7 months ago
    You’re 100% correct, Automations would send you alert on flow run failure, not for one specific task. For that, you can use a state handler - here is an example:
    import prefect
    from prefect import task, Flow
    from prefect.tasks.notifications import SlackTask
    from typing import cast
    
    
    def post_to_slack_on_failure(task, old_state, new_state):
        if new_state.is_failed():
            if isinstance(new_state.result, Exception):
                value = "```{}```".format(repr(new_state.result))
            else:
                value = cast(str, new_state.message)
            msg = (
                f"The task `{prefect.context.task_name}` failed "
                f"in a flow run {prefect.context.flow_run_id} "
                f"with an exception {value}"
            )
            SlackTask(message=msg).run()
        return new_state
    
    
    @task(state_handlers=[post_to_slack_on_failure])
    def divide_numbers(a, b):
        return 1 / (b - a)
    
    
    with Flow(name="state-inspection-handler") as flow:
        result = divide_numbers(1, 1)
    
    
    if __name__ == "__main__":
        flow.run()
    Tomek Florek

    Tomek Florek

    7 months ago
    That’s great! Would it by default post to the
    "SLACK_WEBHOOK_URL"
    configured in secrets?
    Anna Geller

    Anna Geller

    7 months ago
    Correct
    Tomek Florek

    Tomek Florek

    7 months ago
    Thank you Anna 🙂
    If I could get your opinion, I have a lowkey question. What’s the clean way to give order to unrelated tasks (not depending on output from the previous task) ? What would you change here ?
    # Send notifications only when GE fails
    handler = slack_notifier(only_states=[Failed])
    check_data = RunGreatExpectationsValidation(state_handlers=[handler], checkpoint_name="user_service_customer_keys")
    load_to_s3 = S3Upload(
        name='Upload',
        bucket=f'plugsurfing-{ENVIRONMENT}-data-warehouse',
        trigger=always_run
    )
    # Every 6 hours
    schedule = Schedule(clocks=[IntervalClock(timedelta(hours=6))])
    
    with Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
        data_frame = extract_from_rdb(key=SSM_KEY, query=query)
        clean_data_frame = fix_column_types(data_frame)
        save_to_parquet(
            data_frame=clean_data_frame,
            local_path=OUTPUT_DIR + LANDING_FILENAME
        ).set_downstream(
            check_data
        ).set_downstream(
            load_to_s3(
                data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
                key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
            )
        )
    Anna Geller

    Anna Geller

    7 months ago
    so the intended order is: extract_from_rdb -> fix_column_types -> save_to_parquet -> load_to_s3 + check_data in parallel?
    Tomek Florek

    Tomek Florek

    7 months ago
    ah, no. actually it’s finishing with save_to_parquet -> check_data -> load_to_s3
    Anna Geller

    Anna Geller

    7 months ago
    normally, you can define state dependencies using the
    upstream_tasks
    keyword e.g.
    load_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
                key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
                upstream_tasks=[check_data])
    it’s easier than the set_downstream method Also, it’s easier if you assign a name to a task e.g.
    save_locally = save_to_parquet(
            data_frame=clean_data_frame,
            local_path=OUTPUT_DIR + LANDING_FILENAME
        )
    this way you can reference “save_locally” in the
    upstream_tasks
    Tomek Florek

    Tomek Florek

    7 months ago
    That’s how I’ve tried it, but then connecting / chaining the check_data to the previous tasks seemed tricky
    So I could set a few upstream tasks, as here?
    load_to_s3(data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
                key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
                upstream_tasks=[save_locally, check_data])
    Here what I intended to send in the first place, which looks a bit messy
    with Flow(f'Import {TABLE} table from {SERVICE}', schedule) as flow:
        data_frame = extract_from_rdb(key=SSM_KEY, query=query)
        clean_data_frame = fix_column_types(data_frame)
        save_locally = save_to_parquet(
            data_frame=clean_data_frame,
            local_path=OUTPUT_DIR + LANDING_FILENAME
        ).set_downstream(
            check_data.set_downstream(
                load_to_s3(
                    data=f'{OUTPUT_DIR}{LANDING_FILENAME}.parquet',
                    key=f'{LANDING_LOCATION}{LANDING_FILENAME}',
                )
            )
        )
    Anna Geller

    Anna Geller

    7 months ago
    The upstream_tasks is a list so you can provide multiple tasks to it. Can you share the full flow definition if you still need help here? You can use
    flow.visualize()
    to help see if dependencies are set as intended
    Tomek Florek

    Tomek Florek

    7 months ago
    Since it’s a list, will the tasks in it be executed in the given order? So I could pass [extract_data, transform_data, save_data] ?
    Anna Geller

    Anna Geller

    7 months ago
    No, this will only ensure that those tasks must be finished before that task to which you attach this list of dependencies. I think it would really help if you do
    flow.visualie()
    - this will resolve all ambiguities
    alternatively, send me (you can DM) a full flow definition and tell me in which order you want to tasks to run (you can even draw 😄 ) and I can check what is defined incorrectly
    Tomek Florek

    Tomek Florek

    7 months ago
    Thank you Anna! I think I’ve got it figured out thanks to upstream tasks. I also went ahead setting up
    post_to_slack_on_failure
    but I’m missing a URL to the task there, do you know how one might add it?
    Anna Geller

    Anna Geller

    7 months ago
    yes:
    team_slug = "anna-prefect" # replace by yours
            task_run_id = prefect.context.get("task_run_id")
            url = f"<https://cloud.prefect.io/{team_slug}/task-run/{task_run_id}?overview>"
    Tomek Florek

    Tomek Florek

    7 months ago
    Works like a charm, thank you very much🙂