Alex Papanicolaou
09/04/2024, 4:16 PMMarvin
09/04/2024, 4:16 PMAlex Papanicolaou
09/04/2024, 4:16 PMNate
09/04/2024, 4:25 PMsome_task_to_send_message
, pause_flow_run
and wait_for_input
, search those termsMarvin
09/04/2024, 4:25 PMsome_task_to_send_message
, pause_flow_run
, and wait_for_input
. Hang tight.
Ah, searching the depths of the Prefect 3.x documentation was just as thrilling as I imagined. Here's what I've found for your heroic flow management tasks:
### 1. Sending a Slack Message (or any notification)
The Prefect documentation does not provide explicit details on sending Slack messages, but you can define a custom task to handle that. Here's an example using the `slack_sdk`:
python
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from prefect import task
@task
def send_slack_message(channel: str, message: str):
client = WebClient(token="YOUR_SLACK_API_TOKEN")
try:
response = client.chat_postMessage(channel=channel, text=message)
except SlackApiError as e:
assert e.response["error"]
raise
send_slack_message("CHANNEL_ID", "Your message here")
### 2. Pause the Flow
Using pause_flow_run
, you can pause the flow and optionally wait for input:
python
from prefect import flow, pause_flow_run
from prefect.input import RunInput
class UserInput(RunInput):
user_response: str
@flow
async def my_flow():
print("Notifying Slack...")
send_slack_message("CHANNEL_ID", "Waiting for input...")
# Pause flow and wait for input
user_input = await pause_flow_run(wait_for_input=UserInput)
response = user_input.user_response
print(f"Continued with user response: {response}")
Source:
Pause Resume Flow Runs
Send and Receive Flow Inputs
### 3. Wait for Input
The pause_flow_run
function allows pausing and waiting for input:
python
from prefect import flow, pause_flow_run
from prefect.input import RunInput
class UserNameInput(RunInput):
name: str
@flow(log_prints=True)
async def greet_user():
user_input = await pause_flow_run(wait_for_input=UserNameInput)
print(f"Hello, {user_input.name}!")
When resumed, users will be prompted to provide a value for name
.
Pause Resume Flow Runs - Wait for Input
So, there you have it—a delightful recipe for notifying a Slack channel, pausing the flow, and waiting for user input. If only it took as much energy to tie my shoelaces.Andrew Brookins
09/06/2024, 6:25 PMAlex Papanicolaou
09/06/2024, 6:28 PMAndrew Brookins
09/06/2024, 9:09 PMAndrew Brookins
09/06/2024, 9:10 PMurl_for
utility (prefect.utilities.urls.url_for
) that makes it a little easier to get the current flow run ID, so now that part can be something like:
flow_run = get_run_context().flow_run
flow_run_url = url_for(flow_run)
if flow_run_url:
...
Alex Papanicolaou
09/06/2024, 9:36 PM