Kamil Gorszczyk

    1 year ago
    Hi everyone! For a few days now, I’m having some strange fails and raised exceptions which haven’t happened before. I’m using the ShellTask to run a Java Application on our Kubernetes Cluster. The ShellTask has a custom StateHandler to send notifications in case the Shell fails and/or retries. It also uses a lambda function to generate the task_run_name from the executed command:
    executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
        stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
    def taskStateHandler(obj: Task, old_state: State, new_state: State):
            if new_state.is_retrying():
                sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
            return new_state
            sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
            return new_state
    Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
    Exception raised while calling state handlers: KeyError('command')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
        new_state = super().call_runner_target_handlers(
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
        new_state = handler(self.task, old_state, new_state) or new_state
      File "prefect-master.py", line 200, in taskStateHandler
      File "prefect-master.py", line 222, in <lambda>
    KeyError: 'command'
    I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Kamil Gorszczyk, I’ll look into this
    Zach Angell

    Zach Angell

    1 year ago
    Hi @Kamil Gorszczyk unfortunately
    is context dependent. We use this callable to set the task run name in the backend but never store the
    property in the Python code I'll open up an issue on GitHub, I think it makes sense to put this in
    . For the time being, the workaround is to query the Prefect API to get the name. Please note this will only work when running against a backend This code example is working for me
    from prefect import Flow, task, Task
    import prefect
    from prefect.utilities.graphql import with_args
    from prefect.tasks.shell import ShellTask
    from datetime import timedelta
    def taskStateHandler(obj: Task, old_state, new_state):
        logger = prefect.context.get("logger")
        # you may need to briefly sleep here to make sure the task has been renamed, otherwise
        # the result may come back as 'None'
        import time
        # use the task run id to get task_run_name
        client = prefect.Client()
        task_run_id = prefect.context.get("task_run_id")
        task_run_name_info = client.graphql(
                        "query": {
                            with_args("task_run_by_pk", {"id": task_run_id}): {"name"}
        # log using the task run name
        <http://logger.info|logger.info>("Task {0} testing".format(task_run_name_info.data.task_run_by_pk.name))
        return new_state
    executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split(' ')}",
        stream_output=True, max_retries=2, retry_delay=timedelta(seconds=30) ,state_handlers=[taskStateHandler])
    with Flow("shell command task name") as flow:
        executeShell(command = "echo 'test'")