Hi everyone! For a few days now, I’m having some s...
# prefect-server
Hi everyone! For a few days now, I’m having some strange fails and raised exceptions which haven’t happened before. I’m using the ShellTask to run a Java Application on our Kubernetes Cluster. The ShellTask has a custom StateHandler to send notifications in case the Shell fails and/or retries. It also uses a lambda function to generate the task_run_name from the executed command:
Copy code
executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
    stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
Copy code
def taskStateHandler(obj: Task, old_state: State, new_state: State):
        if new_state.is_retrying():
            sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
        return new_state
        sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
        return new_state
Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
Copy code
Exception raised while calling state handlers: KeyError('command')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
    new_state = super().call_runner_target_handlers(
  File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
    new_state = handler(self.task, old_state, new_state) or new_state
  File "prefect-master.py", line 200, in taskStateHandler
  File "prefect-master.py", line 222, in <lambda>
KeyError: 'command'
I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.
Hi @Kamil Gorszczyk, I’ll look into this
Hi @Kamil Gorszczyk unfortunately
is context dependent. We use this callable to set the task run name in the backend but never store the
property in the Python code I'll open up an issue on GitHub, I think it makes sense to put this in
. For the time being, the workaround is to query the Prefect API to get the name. Please note this will only work when running against a backend This code example is working for me
Copy code
from prefect import Flow, task, Task
import prefect
from prefect.utilities.graphql import with_args
from prefect.tasks.shell import ShellTask
from datetime import timedelta

def taskStateHandler(obj: Task, old_state, new_state):
    logger = prefect.context.get("logger")
    # you may need to briefly sleep here to make sure the task has been renamed, otherwise
    # the result may come back as 'None'
    import time

    # use the task run id to get task_run_name
    client = prefect.Client()
    task_run_id = prefect.context.get("task_run_id")
    task_run_name_info = client.graphql(
                    "query": {
                        with_args("task_run_by_pk", {"id": task_run_id}): {"name"}
    # log using the task run name
    <http://logger.info|logger.info>("Task {0} testing".format(task_run_name_info.data.task_run_by_pk.name))
    return new_state

executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split(' ')}",
    stream_output=True, max_retries=2, retry_delay=timedelta(seconds=30) ,state_handlers=[taskStateHandler])

with Flow("shell command task name") as flow:
    executeShell(command = "echo 'test'")