Jyothi
07/26/2022, 2:21 PMdef escalate(obj, old_state, new_state):
if new_state.is_failed() and new_state.message not in IGNORE_MESSAGES:
msg = f"{obj} {new_state.message}"
slack_post(msg, channel="#errors", username="Prefect")
return new_state
and i would like to
def escalate(obj, old_state, new_state, error_channel="#errors"):
because i need have different channel for some flowsAnna Geller
07/26/2022, 3:19 PMKhuyen Tran
07/26/2022, 3:25 PMimport prefect
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import RenameFlowRun
def rename_handler(obj, new_state, old_state):
if new_state.is_running():
user = prefect.context.parameters.get("user_name")
param_value = user.lower().replace(" ", "_")
RenameFlowRun().run(flow_run_name=f"hello_{param_value}")
return
@task(log_stdout=True)
def greet_user(user_name):
print(f"Hello {user_name}")
with Flow("using_parameters_in_state_handler", state_handlers=[rename_handler]) as flow:
param = Parameter("user_name", default="Jerry Seinfeld")
greet_user(param)
Basically, you can pass a parameter to your state handler using Parameter
Jyothi
07/26/2022, 4:05 PM@task(state_handlers=[escalate])
def test_script1():
script1.main({"all": True, "ingest_type": "today"})
@task(state_handlers=[escalate])
def test_script2():
script2.main({"epid": True})
def main():
flow = Flow("Test data ingestion", schedule=schedule, state_handlers=[escalate])
param = Parameter("slack_channel_name", default="#api-errors")
flow.add_task(test_script1)
flow.set_dependencies(
test_script2, upstream_tasks=[test_script1]
)
register_flow(
flow=flow,
project_name=PIPELINE_PROJECT,
)
Anna Geller
07/27/2022, 7:20 AMJyothi
07/27/2022, 9:33 AMdef escalate(obj, old_state, new_state):
if new_state.is_failed() and new_state.message not in IGNORE_MESSAGES:
msg = f"{obj} {new_state.message}"
slack_post(msg, channel="#errors", username="Prefect")
return new_state
i need to access param = Parameter("slack_channel_name", default="#api-errors") in the escalate functionAnna Geller
07/27/2022, 11:07 AMJyothi
07/27/2022, 2:35 PMdef main():
flow = Flow("Test data ingestion", schedule=schedule, state_handlers=[escalate])
param = Parameter("slack_channel_name", default="#api-errors")
flow.add_task(test_script1)
flow.set_dependencies(
test_script2, upstream_tasks=[test_script1]
)
register_flow(
flow=flow,
project_name=PIPELINE_PROJECT,
)
Anna Geller
07/27/2022, 3:20 PMJyothi
07/27/2022, 3:22 PM