Kayvon Avishan
04/24/2024, 2:19 PMNate
04/24/2024, 2:20 PMMarvin
04/24/2024, 2:21 PMpython
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL
def notify_slack(flow, flow_run, state):
slack_webhook_block = Block.load("slack-webhook/my-slack-webhook")
slack_webhook_block.notify((
f"Your job {flow_run.name} entered {state.name} "
f"with message:\n\n"
f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
f"Tags: {flow_run.tags}\n\n"
f"Scheduled start: {flow_run.expected_start_time}"
))
@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
raise ValueError("oops!")
if __name__ == "__main__":
failing_flow()
This example uses a client-side hook on a flow run entering a Failed
state to send a message to Slack, but you could adapt this to log messages to the Prefect Cloud UI by using appropriate functions or mechanisms provided by Prefect for logging.
Moreover, as shown in another provided example, you could define state change hooks for tasks and flows by simply writing functions that follow a specific signature and then decorating your tasks or flows with these hooks. For example:
python
@task(on_failure=[my_task_hook])
In each hook function, you could include logic to send logs to the desired destination.
Here is the link to the State Change Hooks documentation for detailed guidance on how to implement state change hooks in your Prefect flows.
Please note that while these examples give a general guideline, logging directly to the Prefect Cloud UI may require further configurations or use of Prefect's logging utilities. Make sure to check Prefect's documentation or support resources for information specific to logging within the Prefect Cloud context.