https://prefect.io logo
Title
k

Kamil Gorszczyk

06/20/2021, 3:23 PM
Hi everyone! For a few days now, I’m having some strange fails and raised exceptions which haven’t happened before. I’m using the ShellTask to run a Java Application on our Kubernetes Cluster. The ShellTask has a custom StateHandler to send notifications in case the Shell fails and/or retries. It also uses a lambda function to generate the task_run_name from the executed command:
executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
    stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
def taskStateHandler(obj: Task, old_state: State, new_state: State):
    try:
        if new_state.is_retrying():
            sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
        return new_state
    except:
        sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
        return new_state
Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
Exception raised while calling state handlers: KeyError('command')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
    new_state = super().call_runner_target_handlers(
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "prefect-master.py", line 200, in taskStateHandler
  File "prefect-master.py", line 222, in <lambda>
KeyError: 'command'
I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.
k

Kevin Kho

06/21/2021, 1:19 PM
Hi @Kamil Gorszczyk, I’ll look into this
z

Zach Angell

06/21/2021, 4:30 PM
Hi @Kamil Gorszczyk unfortunately
task.task_run_name
is context dependent. We use this callable to set the task run name in the backend but never store the
task_run_name
property in the Python code I'll open up an issue on GitHub, I think it makes sense to put this in
prefect.context
. For the time being, the workaround is to query the Prefect API to get the name. Please note this will only work when running against a backend This code example is working for me
from prefect import Flow, task, Task
import prefect
from prefect.utilities.graphql import with_args
from prefect.tasks.shell import ShellTask
from datetime import timedelta


def taskStateHandler(obj: Task, old_state, new_state):
    logger = prefect.context.get("logger")
    # you may need to briefly sleep here to make sure the task has been renamed, otherwise
    # the result may come back as 'None'
    import time
    time.sleep(10)

    # use the task run id to get task_run_name
    client = prefect.Client()
    task_run_id = prefect.context.get("task_run_id")
    task_run_name_info = client.graphql(
                {
                    "query": {
                        with_args("task_run_by_pk", {"id": task_run_id}): {"name"}
                    }
                }
            )
    # log using the task run name
    <http://logger.info|logger.info>("Task {0} testing".format(task_run_name_info.data.task_run_by_pk.name))
    return new_state


executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split(' ')}",
    stream_output=True, max_retries=2, retry_delay=timedelta(seconds=30) ,state_handlers=[taskStateHandler])

with Flow("shell command task name") as flow:
    executeShell(command = "echo 'test'")

flow.register("test")