Hi there! I have a question about task inputs and ...
# prefect-community
b
Hi there! I have a question about task inputs and triggers.
I've lightly adapted the sample code from the triggers documentation, shown below. This does what I want it to do. However, I would like to be able to pass a result from the
do_something
task to the
cleanup_on_failure
task. What is the correct way to do this?
Copy code
import random

from prefect.triggers import all_failed
from prefect import task, Flow


@task(name="Do Something")
def do_something():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")


@task(name="Cleanup Task", trigger=all_failed)
def cleanup_on_failure():
    pass


with Flow("Trigger example") as flow:
    fail = cleanup_on_failure(upstream_tasks=[do_something])
    flow.set_reference_tasks([do_something])
Follow-up question... is it possible for a task to fail but also return data for downstream consumption?
a
If do_something fails, the result of it will be an exception. If you want to pass this exception to the cleanup task, you can do that the same way as you would normally pass data between tasks:
Copy code
import random

from prefect.triggers import all_failed
from prefect import task, Flow


@task(name="Do Something")
def do_something(x):
    if random.random() > 0.5:
        raise ValueError(f"Non-deterministic error has occured. Got value {x}")
    return x


@task(name="Cleanup Task", trigger=all_failed, log_stdout=True)
def cleanup_on_failure(x):
    print(f"Cleaning up. Got value {x}")


with Flow("Trigger example") as flow:
    sth = do_something(2)
    fail = cleanup_on_failure(sth)
    flow.set_reference_tasks([sth])

if __name__ == "__main__":
    flow.run()
The above flow will pass the exception message "Got value Non-deterministic error has occured. Got value 2" to the cleanup task, when it fails. The follow up question: yes, it's possible to pass data to downstream task even it the task fails, but this data will be an exception message - if you need that e.g. to pass the reason why a task failed to a failure notification task, this should work
b
Thank you @Anna Geller that makes sense. Do you think it would be possible to subclass the
Failed
state and add a custom result attribute to it? Since the exception would be passed as the
state.message
if I understand correctly.
What I'm actually trying to do is a series of small tasks (say 100 of them), expecting that a handful will fail. I'd like to do one notification/cleanup at the end where I send a list of the failed tasks somewhere (likely Slack). I am trying to batch all of the failures together as one message rather than send one individual slack message per failed task.
a
I see. Thanks for explaining the problem. In that case, I would approach it differently. I would create another task and set dependencies so that:
Copy code
@task
def notify_on_all_failures():
    pass

with Flow() as flow:
   t1 = task1()
   t2 = task2()
   notify_on_all_failures(upstream_tasks=[t1, t2], trigger=any_failed)
This way, if any of those upstream tasks fails, you will send a single notification on failure.
b
Would that include exception info from each failed task?
a
no, it would only include the info that some of those upstream tasks failed. Do you think is it critical for you to get all those exceptions via Slack or so? You could e.g. send a link to the flow run page in your Slack message allowing to easily click and inspect further in the UI.
Copy code
team_slug = "anna-prefect" # replace by your team-slug
run_id = prefect.context.get("flow_run_id")
url = f"<https://cloud.prefect.io/{team_slug}/flow-run/{run_id}>"
b
Ok that is very helpful. I have some more thinking to do… thank you!
a
but if you think getting those exceptions is critical, you would need to dig a bit deeper and I can't figure out any clean and straightforward way to do it other than e.g. querying the backend within this single
notify_on_all_failures
task. Definitely possible using FlowRunView and TaskRunView, but involves a bit more work
if you happen to do that. I would be super grateful if you could share your solution. I saw a similar request before 1-2 times but didn't see anyone eventually doing that and sharing their approach 🙂
b
Ok cool. I have some ideas, we'll see if I can make it a little bit generic enough.
In case it's of interest, I have been working on generic state handler stuff to put in my flows. It isn't very applicable in the case I've been asking about, but is pretty useful more generally.
Copy code
def is_flow(obj):
    return isinstance(obj, prefect.Flow)


def is_task(obj):
    return isinstance(obj, prefect.Task)


def get_cloud_url(tracked_obj: TrackedObjectType):
    """
    Return contextually appropriate cloud URL
    """
    if flow_run_id := prefect.context.get("flow_run_id"):
        if is_flow(tracked_obj):
            return prefect.client.Client().get_cloud_url("flow-run", flow_run_id)
        elif is_task(tracked_obj):
            return prefect.client.Client().get_cloud_url(
                "task-run", prefect.context.get("task_run_id", "")
            )


def construct_exception_blocks(
    tracked_obj: TrackedObjectType, state: "prefect.engine.state.State",
):
    """
    Construct blocks to be appended to main message with contextual error info.
    This function can handle flows or tasks in a failed state. Every failed task
    within a failed flow will be appended as a separate block.
    Blocks can hold up to 3000 characters.
    Args:
        tracked_obj: (prefect.Flow, prefect.Task) Object containing error information.
        state: (prefect.engine.state.State) State containing error information.
    """
    blocks = []
    try:
        if is_task(tracked_obj):
            exception_block = {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"```{repr(state.result)[:2990]}```",  # stay under block limit of 3000 chars
                },
            }
            blocks.append(exception_block)
        elif is_flow(tracked_obj):
            for result in state.result.values():
                if result.is_failed():
                    exception_block = {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"```{result.message[:2990]}```",  # stay under block limit of 3000 chars
                        },
                    }
                    blocks.append(exception_block)
    except:
        blocks.append(
            {
                "type": "section",
                "text": {
                    "type": "plain_text",
                    "text": "Unable to construct exception block, see logs for details.",
                },
            }
        )

    return blocks


def construct_parameter_block():
    """
    Return block with pretty printed json of parameters, if any.
    """
    if parameters := prefect.context.get("parameters", {}):
        return {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"*Parameters*:\n```{json.dumps(parameters, indent=2, sort_keys=True)}```",
            },
        }


def construct_message_block(mrkdwn_text):
    """
    Return block containing an arbitrary messaage
    Params
        mrkdwn_text: markdown-formatted text string
    
    For reference, see: <https://api.slack.com/messaging/composing/formatting>
    """
    return {"type": "section", "text": {"type": "mrkdwn", "text": mrkdwn_text}}


def construct_payload(
    tracked_obj: TrackedObjectType,
    state: "prefect.engine.state.State",
    custom_message_callback: Callable = None,
):
    """
    Construct standard payload using Slack's Block Kit.
    Includes parameter and exception information when relevant.
    Use optional parameter `custom_message_callback` to add a custom message to the end of the payload.
    
    For reference details, see: <https://api.slack.com/block-kit>
    Args:
        tracked_obj: (prefect.Flow, prefect.Task) The object in question, which will be
            inspected to produce an appropriate message.
        state: (prefect.engine.state.State) The state that the tracked object has
            transitioned to. State details are used to enrich the message payload.
    """
    project_name = prefect.context.get("project_name", "< Project Name Not Found >")
    flow_name = prefect.context.get("flow_name", "< Flow Name Not Found >")
    url = get_cloud_url(tracked_obj)

    blocks = [
        {"type": "divider"},
        {
            "type": "section",
            "fields": [
                {"type": "mrkdwn", "text": "*Status*"},
                {"type": "mrkdwn", "text": "*Project*"},
                {"type": "mrkdwn", "text": f"*<{url}|{type(state).__name__}>*",},
                {"type": "plain_text", "text": project_name},
                {"type": "mrkdwn", "text": "*Flow*"},
                {"type": "mrkdwn", "text": "*Message*"},
                {"type": "plain_text", "text": flow_name},
                {"type": "plain_text", "text": getattr(state, "message", "< None >")},
            ],
        },
    ]
    if parameter_block := construct_parameter_block():
        blocks.append(parameter_block)
    if state.is_failed():
        exception_blocks = construct_exception_blocks(tracked_obj, state)
        blocks.extend(exception_blocks)
    if custom_message_callback:
        blocks.append(custom_message_callback())

    return {"blocks": blocks}


def make_slack_notification(
    webhook_parameter_name: str, custom_message_callback: Callable = None
):
    """
    Returns a callable which can be supplied to the callback_factory.
    This function can be incorporated into state handlers for tasks or flows.
    """

    def _slack_notification(
        obj: TrackedObjectType, new_state: "prefect.engine.state.State"
    ):
        """
        State handler function. Posts useful information to Slack, including:
        * Status, Project, Flow, Parameter(s), Error Message(s)
        """
        slack_webhook_url = get_slack_webhook_url(webhook_parameter_name)
        payload = construct_payload(obj, new_state, custom_message_callback)
        resp = <http://requests.post|requests.post>(slack_webhook_url, json=payload)
        if resp.status_code != 200:
            # If original message was malformed or there was some other issue,
            # try to ensure that _something_ is posted.
            <http://requests.post|requests.post>(
                slack_webhook_url,
                json={
                    "text": f"{obj} is in state {new_state}\nFull response is unavailable at this time, please review logs."
                },
            )

    return _slack_notification


def state_handler_factory(
    webhook_parameter_name: str = None,
    check: Callable = None,
    custom_message_callback: Callable = None,
):
    """
    Use this function to build a custom state handler for prefect Flows or Tasks.
    Args:
        - webhook_parameter_name (str): an SSM parameter that should resolve to a Slack Webhook URL
        - check (Callable): a function with signature `check(state: State) -> bool`
            that is used for determining when the callback function should be called
        - custom_message_callback (Callable): a function that will be called during execution
            of the state handler, and will return a Slack Block Kit block.
    """
    if check is None:
        check = lambda state: state.is_failed()
    return callback_factory(
        fn=make_slack_notification(
            webhook_parameter_name=webhook_parameter_name,
            custom_message_callback=custom_message_callback,
        ),
        check=check,
    )
I think maybe an adaptation of the
construct_exception_blocks
function may be the way to go but we'll see if it works.
a
Wow, so nicely modular, thanks for sharing! Will share on Discourse tomorrow. Keep us posted on your progress and if we can help in some way
b
Thanks! It ends up looking like this, which I find very useful, especially for a relatively non-technical audience
k
That looks good! But just mentioning that yes you can subclass the Failed state I believe because the downstream code use
isinstance(state, Failed)
and this will resolve to True so you can subclass the Failed state and embed an object in there too for later retrieval. I’m positive at least.
b
I am noticing that there is a difference between the state objects available within a state handler for tasks vs flows. It seems easier to access the actual
Exception
object from a task state handler, while I only see the string representation of the Exception in a flow state handler.
k
This is true because flow level state handlers don’t hold all of the results, but the task state handler holds the immediate result I believe.
upvote 1
b
I suppose that makes sense since there can be a huge number of dynamic tasks.
This code is moving in the direction I'm trying to go... it may not be very Prefect-y but gets closer to my goal.
Copy code
from prefect import Flow, task


class MySpecialException(Exception):
    def __init__(self, message, detail):
        self.message = message
        self.detail = detail
        super().__init__(message)


class ImportantThing:
    def __init__(self):
        self.failed = []

    def run(self, inputs):
        for input in inputs:
            try:
                input / 0
            except:
                self.failed.append(input)


def state_handler(obj, old, new):
    if new.is_failed():
        print("; ".join([str(x) for x in new.result.detail]))


@task(state_handlers=[state_handler])
def do_the_thing():
    thing = ImportantThing()
    thing.run(inputs=[1, 2, 3, 4, 5])
    if thing.failed:
        raise MySpecialException("Some failures", detail=thing.failed)


with Flow("Thing Do") as flow:
    do_the_thing()


res = flow.run()
k
Exactly yes! This is one way to bring anything from the Task to the state handler
🚀 2
a