https://prefect.io logo
Title
j

Jyothi

07/26/2022, 2:21 PM
Currently I have this:
def escalate(obj, old_state, new_state):
    if new_state.is_failed() and new_state.message not in IGNORE_MESSAGES:
        msg = f"{obj} {new_state.message}"
        slack_post(msg, channel="#errors", username="Prefect")
    return new_state
and i would like to
def escalate(obj, old_state, new_state, error_channel="#errors"):
because i need have different channel for some flows
1
a

Anna Geller

07/26/2022, 3:19 PM
k

Khuyen Tran

07/26/2022, 3:25 PM
I believe this is the code snippet you are looking for from the article Anna posted:
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import RenameFlowRun


def rename_handler(obj, new_state, old_state):
    if new_state.is_running():
        user = prefect.context.parameters.get("user_name")
        param_value = user.lower().replace(" ", "_")
        RenameFlowRun().run(flow_run_name=f"hello_{param_value}")
    return


@task(log_stdout=True)
def greet_user(user_name):
    print(f"Hello {user_name}")


with Flow("using_parameters_in_state_handler", state_handlers=[rename_handler]) as flow:
    param = Parameter("user_name", default="Jerry Seinfeld")
    greet_user(param)
Basically, you can pass a parameter to your state handler using
Parameter
🙌 1
:thank-you: 1
j

Jyothi

07/26/2022, 4:05 PM
Thanks @Anna Geller and @Khuyen Tran. But i don't want to pass the parameter to each function. Suppose i have a prefect flow script which contains 6 tasks and each task one state handler. I need to pass name to state handler by not passing the parameter to each function
@Anna Geller any input on this?
@task(state_handlers=[escalate])
def test_script1():
    script1.main({"all": True, "ingest_type": "today"})


@task(state_handlers=[escalate])
def test_script2():
    script2.main({"epid": True})

def main():
    flow = Flow("Test data ingestion", schedule=schedule, state_handlers=[escalate])
    param = Parameter("slack_channel_name", default="#api-errors")
    flow.add_task(test_script1)
    flow.set_dependencies(
        test_script2, upstream_tasks=[test_script1]
    )

    register_flow(
        flow=flow,
        project_name=PIPELINE_PROJECT,
    )
a

Anna Geller

07/27/2022, 7:20 AM
What problem are you trying to solve?
j

Jyothi

07/27/2022, 9:33 AM
def escalate(obj, old_state, new_state):
    if new_state.is_failed() and new_state.message not in IGNORE_MESSAGES:
        msg = f"{obj} {new_state.message}"
        slack_post(msg, channel="#errors", username="Prefect")
    return new_state
i need to access param = Parameter("slack_channel_name", default="#api-errors") in the escalate function
a

Anna Geller

07/27/2022, 11:07 AM
Ahh gotcha, try using prefect.context.parameters.get("slack_channel_name")
j

Jyothi

07/27/2022, 2:35 PM
where?. In below?
def main():
    flow = Flow("Test data ingestion", schedule=schedule, state_handlers=[escalate])
    param = Parameter("slack_channel_name", default="#api-errors")
    flow.add_task(test_script1)
    flow.set_dependencies(
        test_script2, upstream_tasks=[test_script1]
    )

    register_flow(
        flow=flow,
        project_name=PIPELINE_PROJECT,
    )
a

Anna Geller

07/27/2022, 3:20 PM
In your state handler function
j

Jyothi

07/27/2022, 3:22 PM
thanks