<@ULVA73B9P> I am create a markdown artifact at th...
# ask-marvin
g
@Marvin I am create a markdown artifact at the end of my flow run. how can I also post this markdown to slack? I would also like to include a like to the flow run in slack
m
To post markdown to Slack and include a link to the flow run, you can perform the following steps: 1. Post Markdown to Slack: Prefect has an integration with Slack, as mentioned in Prefect's documentation. You can use the prefect-slack integration to post messages to Slack channels. You need to configure a Slack webhook credential and use tasks or automations to send messages to Slack. As there are no direct examples found in the docs search, refer to the prefect-slack repository for guidance on how to contribute and use the integration. 2. Include a link to the flow run: For dynamically generating flow run URLs, you can utilize the
prefect.runtime
module to access flow run information. Construct the URL using the necessary information and append it to your Slack message. Unfortunately, specific code examples on including the flow run URL weren't found directly through the tools used, but you can review the concepts/flows documentation to better understand how to utilize the
prefect.runtime
module. The following is a generalized Python snippet using Prefect and Slack integration:
Copy code
python
from prefect import flow, task
from prefect.tasks import SlackTask

@task
def create_markdown_report():
    # Your logic to create a markdown report
    return "Your markdown content here"

@flow
def my_flow():
    report = create_markdown_report()
    
    # SlackTask to post message (ensure you've configured your webhook URL)
    slack_task = SlackTask()
    slack_message = f"Markdown Report:\n{report}\nFlow Run Link: YOUR_FLOW_RUN_URL"
    
    # Posting to Slack
    slack_task(message=slack_message)

if __name__ == "__main__":
    my_flow()
Replace "YOUR_FLOW_RUN_URL" with the dynamically generated URL for your flow run, and configure
SlackTask
with your webhook URL as per the instructions on the prefect-slack repository. Please note, always test your code thoroughly, as it seems right up my alley of luck to make things more complex than necessary.
g
@Marvin how can I get the url of my flow run?
m
To get the URL of your flow run programmatically, you often need to construct the URL by combining the base URL of the Prefect UI with the specific flow run ID. The Prefect documentation does not seem to provide a direct excerpt on obtaining the flow run URL, so here's a general approach: 1. Use the
prefect.runtime
module to get the current flow run ID within your flow. 2. Construct the URL manually. The flow run URL typically follows the pattern:
<https://cloud.prefect.io/<tenant_slug>/flow-run/<flow_run_id>>
Here's how you might do it in your flow:
Copy code
python
from prefect import flow
from prefect.runtime import get_run_context

@flow
def my_flow():
    context = get_run_context()
    flow_run_id = context.flow_run_id
    tenant_slug = 'your-tenant-slug'  # Replace with your actual tenant slug
    flow_run_url = f'<https://cloud.prefect.io/{tenant_slug}/flow-run/{flow_run_id}>'
    return flow_run_url

if __name__ == "__main__":
    flow_run_url = my_flow()
    print("Flow run URL:", flow_run_url)
Remember that you need to replace
your-tenant-slug
with the actual tenant slug for your Prefect Cloud workspace. You can typically find this in the base URL when you're logged into the Prefect Cloud UI. Since my capacity for joy is minimal, I suggest verifying the correctness of the URL pattern for your specific Prefect setup, as they can vary. Please note that this code assumes you are using Prefect Cloud. If you are using Prefect Server, the base URL will be different.
g
@Marvin can I pass variables to a state change hook? for example, if my flow generates text, can I have it sent to a slack notification on completion?
m
Yes, you can pass variables to a state change hook to send a notification, like a Slack message, upon the completion of your flow. Prefect supports the addition of custom state change hooks that can be triggered when a flow run changes its state. To have text sent to a Slack notification on completion, you need to define a flow run state change hook and use it with a Slack task that sends the message. The state change hook can have access to the current flow, flow run, and state information and can utilize these details to perform custom actions, such as sending notifications. Here's an example of how to use a state change hook with Slack integration:
Copy code
python
from prefect import flow, task
from prefect.tasks import SlackTask
from prefect.tasks.secrets import PrefectSecret

# Define a state change hook that sends a message to Slack
def my_flow_hook(flow, flow_run, state):
    # Check if the state is Completed
    if state.is_completed():
        # Retrieve Slack webhook URL from Prefect Secret
        slack_webhook_url = PrefectSecret("SLACK_WEBHOOK_URL").run()
        
        # Construct the message
        message = f"Flow '{flow.name}' has completed with state '{state.name}'. Generated text: {state.result}"
        
        # Send the message to Slack
        slack_task = SlackTask()
        slack_task(message=message, webhook_url=slack_webhook_url)

# Define a task that generates some text
@task
def generate_text():
    return "Generated text by the flow."

@flow(on_completion=[my_flow_hook])  # Register the state change hook
def my_flow():
    # Generate text within the flow
    generated_text = generate_text()
    # You can set the result of the flow run to be the generated text
    return generated_text

if __name__ == "__main__":
    my_flow()
In this example, when the flow completes, the state change hook
my_flow_hook
is invoked. It uses the state of the flow run to determine if the flow is completed and then sends a message to Slack with the generated text. Make sure to replace
"SLACK_WEBHOOK_URL"
with the name of the secret that stores your Slack webhook URL, and have this secret set up in your Prefect secrets manager. Please note that this example assumes you have correctly configured the Slack webhook URL as a Prefect secret and have the necessary permissions to Post messages to your preferred Slack channel. The example will not work out of the box without setting up the requisite configurations.
g
@Marvin can I add any other parameters to
my_flow_hook
?
m
State change hooks in Prefect are designed with a specific signature that needs to be adhered to for the hooks to work properly. The signature for a flow run state change hook is as follows:
Copy code
python
def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State, old_state: State):
    # Your hook logic here
This signature cannot be arbitrarily changed to include additional parameters directly. However, if you need to use additional data within your state change hook, you have a couple of options: 1. Using Prefect Context or Global State: You can store data in the Prefect global context or the flow's state attributes and access it within the hook. For instance, you might add additional information to the flow's state during its execution, which you can later access within the hook:
Copy code
python
from prefect import flow, task
from prefect.engine.state import State

@task
def add_data_to_state():
    # Your logic to add data to state
    State.add_data('key', 'value')

@flow(on_completion=[my_flow_hook])  # Register the state change hook
def my_flow():
    # ...
    add_data_to_state()
    # ...

def my_flow_hook(flow, flow_run, state, old_state):
    # Access additional data added to state during the flow run
    additional_data = state.get_data('key')
    # Your hook logic using additional_data
2. Partial Functions: You can use partial functions from the
functools
module to "attach" additional parameters to the hook. These additional parameters will be available when the hook is called by the flow:
Copy code
python
from prefect import flow, task
from prefect.engine.state import State
from functools import partial

def my_custom_hook(flow, flow_run, state, old_state, my_param):
    # Your hook logic here using my_param
    pass

# During flow registration, create a partial function with your additional parameters
my_flow_hook = partial(my_custom_hook, my_param='my_additional_value')

@flow(on_completion=[my_flow_hook])  # Register the state change hook
def my_flow():
    # ...
    pass

if __name__ == "__main__":
    my_flow()
By using a partial function, you're effectively creating a new function with some of its parameters pre-filled. However, be careful when adopting this approach because it deviates from the usual hook pattern, which may impact readability and maintainability for others who are familiar with Prefect's typical usage patterns.
g
@Marvin what will state.result be if the pipeline fails?
m
When a flow run fails, the
state.result
attribute in Prefect typically contains the error that caused the failure. Prefect uses internal result types to capture information about the result attached to a state. For a failed flow run, you can expect the result to be an instance of
UnpersistedResult
,
LiteralResult
, or
PersistedResult
, which includes an error message or exception as its value. These result objects provide a
get()
method that can be called to return the value (or exception) associated with the failed state. This value is automatically retrieved when you use the
result()
method on states. For implementing a state change hook to handle a failed flow run, you can access the error details via
state.result
and then use that information for your Slack message or any other notification logic you want to implement. Here's an example of what you might do in such a state change hook:
Copy code
python
def my_failure_hook(flow, flow_run, state):
    if state.is_failed():
        failure_message = f"Flow '{flow.name}' failed: {state.result}"
        # logic to send failure_message to Slack or perform other actions
The exact representation of
state.result
in the context of a failed flow depends on the nature of the failure and how results are configured in your workflow. Remember, when interacting with the
state.result
, handle potential exceptions properly and log or handle cases where the result might not have the information you expect. For more details on working with states and results, you can refer to Prefect's documentation about results and state change hooks.
g
@Marvin my flow return text. I am trying to include this in my message for my on_completion state change hook. My notify_slack function includes
state.result
but this is printing out as a bound method, not the actual text my flow is returning. How do I include the text my flow is returning in the state change hook? this is what is being returned <bound method State.result of Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))>
m
The
state.result
attribute is a method that needs to be called in order to access the actual result of the flow run. In your
notify_slack
function within the state change hook, you are likely missing the parentheses to call the
result()
method. Instead of passing
state.result
directly, you need to call the method and pass its result, which will be the text your flow is returning. Here's the corrected approach:
Copy code
python
def notify_slack(flow, flow_run, state):
    if state.is_completed():
        # call .result() to get the actual text result from the flow
        result_text = state.result()  # This will get the value returned by your flow
        slack_message = f"Flow completed with result: {result_text}"
        # Send slack_message to Slack using your preferred method
        # ...
Remember to call
state.result()
with the parentheses to actually access the return value of your flow. If you pass
state.result
without the parentheses, it will indeed give you the string representation of the method object rather than the value. Here is a more complete example with a dummy Slack notification function:
Copy code
python
from prefect import flow, task

@task
def generate_text():
    return "Generated text by the flow."

@flow
def my_flow():
    text = generate_text()
    return text

# Imagine this function sends a notification to slack
# You'll need to implement the actual logic!
def send_slack_notification(message):
    print(f"Sending Slack notification: {message}")

def notify_slack(flow, flow_run, state):
    if state.is_completed():
        result_text = state.result()  # Call the method to get the value
        slack_message = f"Flow '{flow.name}' completed with result: {result_text}"
        send_slack_notification(slack_message)

# Running the flow with the notify_slack hook on completion
my_flow(on_completion=[notify_slack])
By using
state.result()
in your state change hook, you will correctly retrieve the result provided by the flow upon its completion.