maybe im doing something wrong, but I’ve noticed d...
# ask-community
z
maybe im doing something wrong, but I’ve noticed differences in behavior in terminal state handlers between running in cloud vs running locally. Locally, the terminal state handler always does what I would expect (runs once a flow is in its final state). However I’ve seen flow runs for long running tasks in cloud that have the terminal state handler called when the flow is still in a running state.
and maybe the answer is im not using them right and i should be using normal state handlers, since I only want things ran when we go from Running -> to any other state
n
Hi @Zach Schumacher - could you share some minimum reproducible code?
z
Copy code
from typing import Set, Optional

from prefect import task, Flow
from prefect.engine.state import State
from prefect.utilities.logging import get_logger

logger = get_logger("example-terminal-state-behavior")


def custom_terminal_state_handler(
        flow: Flow,
        state: State,
        reference_task_states: Set[State],
) -> Optional[State]:
    if state.is_failed():
        for task, task_state in state.result.items():
            <http://logger.info|logger.info>(f"task.name={task.name} state={task_state}")
    return state


@task()
def abc(x):
    return x


@task()
def bcd(x):
    raise ValueError("Foo!")


@task()
def cde(x):
    return x


with Flow(
        "example-terminal-state-behavior", terminal_state_handler=custom_terminal_state_handler
) as flow:
    a = abc(1)
    b = bcd(a)
    c = cde(b)
flow.run()
basically am seeing inconsistent behavior in what is in state.result between running locally and running in cloud
for example, had one flow run that had nothing in result, had another that showed the tasks as still running even though they all finished successfully
to clarify, it always works fine when running locally, i only see this behavior in cloud
we run flows on a pretty vanilla kubernetes run config and agent
n
Got it - let us test this a bit and get back to you
z
maybe an elegant solution would be storing task name and task state in context? What I’m trying to accomplish is writing a rich slack alert when a flow finishes that shows the final state of the flow and the final state of each task in the flow
n
Ok sorry - the issue here is that terminal state handlers operate differently between Core and Cloud, mainly due to latency and not all information being available to the terminal handler at execution time. This is something we're looking to improve but in the meantime a straightforward way to handle this would be to add a terminal task instead that runs last and can query for the information you need using the client before sending it to Slack. This gives you the added benefit of being able to track your Slack task as part of your flow (and even reuse it between flows)
z
How can I access the state of other tasks?
n
You can do something like this:
Copy code
from prefect import Client

@task
class SlackNotifier(SlackTask):
  def run(self, webhook_secret):
    id = prefect.context.get("flow_run_id")
    client = Client()
    query = f"""
      query { flow_run_by_pk(id: {id}) { id, state, task_run { id, state } }
    """

    res = client.graphql(query)
    # parse response for flow run state and task run states

    self.message = "hello"
    self.webhook_secret = webhook_secret

    super(SlackNotifier, self).run()
z
Ah so I have to use the api. In that case I’ll just use a vanilla state handler and check that the state is finished. Will that work in cloud?
n
That should work, though you'll still want to use the API to grab all your states. See this issue on the terminal state handler: https://github.com/PrefectHQ/prefect/issues/4570
z
Yeah I figured. Thanks for the help on this!
n
Yup yup! 🙂
z
So I guess now I have a solution that will never work locally but will always work in cloud. Not ideal but should work for now! Any idea if addressing that is on the road map?
n
Definitely is! We've got some good stuff in the works but I don't have a good timeline yet
👍 1
z
looks like even over the API, there is still latency. I have a “is finished” check in my state handler and am still unable to get the final status of the flow or the task
here is my handler
Copy code
def get_slack_state_handler(
    webhook_url: str, message_factory: MessageFactory = _get_slack_message
) -> StateHandlerType:  # pragma: no cover
    """
    Inject in the slack webhook url (to make this un-opinionated on where it comes from) and also allow overriding
    the how the slack message is created (via message_factory).

    The default behavior is a slack message with one attachment for each prefect task.
    """

    def slack_state_handler(flow: Flow, old_state: State, new_state: State):
        """
        We have to fetch task status from the graph API when running in cloud (see issue referenced above). This
        code can go away when terminal state handlers work correctly in cloud/k8s
        """
        # this only works when running in prefect cloud
        if new_state.is_finished() and os.getenv("ENVIRONMENT") in {"dev", "staging", "prod"}:
            from prefect import context

            flow_run_id = context.get("flow_run_id")
            flow_run = get_flow_run(flow_run_id)
            slack_message = message_factory(flow_run)
            <http://sbrequests.post|sbrequests.post>(webhook_url, json=slack_message)

    return slack_state_handler
and my query
Copy code
def get_flow_run(flow_run_id: str) -> PrefectFlowRun:
    client = Client()
    response = client.graphql(
        f"""
        query {{ 
          flow_run_by_pk ( id: "{flow_run_id}" ),
        {{ 
            id, 
            flow {{
              id,
              name,
              version
            }}
            state,
            start_time,
            end_time,
            state_message 
            task_runs {{
              id, 
              state, 
              state_message,
              start_time,
              end_time,
              task {{
                id,
                name,
                slug,
                type
              }}
            }}
          }}
        }}"""
    )
    return PrefectFlowRun.from_orm(response.data.flow_run_by_pk)
+ @Kevin Kho for visibility. Curious if there is an elegant solution to this? Should i just sleep for 5 seconds in my handler?
k
Yeah will ask the team about it
👍 1