k

    Kamil Gorszczyk

    1 year ago
    Hi everyone! For a few days now, I’m having some strange fails and raised exceptions which haven’t happened before. I’m using the ShellTask to run a Java Application on our Kubernetes Cluster. The ShellTask has a custom StateHandler to send notifications in case the Shell fails and/or retries. It also uses a lambda function to generate the task_run_name from the executed command:
    executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
        stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
    def taskStateHandler(obj: Task, old_state: State, new_state: State):
        try:
            if new_state.is_retrying():
                sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
            return new_state
        except:
            sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
            return new_state
    Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
    Exception raised while calling state handlers: KeyError('command')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
        new_state = super().call_runner_target_handlers(
      File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
        new_state = handler(self.task, old_state, new_state) or new_state
      File "prefect-master.py", line 200, in taskStateHandler
      File "prefect-master.py", line 222, in <lambda>
    KeyError: 'command'
    I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Kamil Gorszczyk, I’ll look into this
    Zach Angell

    Zach Angell

    1 year ago
    Hi @Kamil Gorszczyk unfortunately
    task.task_run_name
    is context dependent. We use this callable to set the task run name in the backend but never store the
    task_run_name
    property in the Python code I'll open up an issue on GitHub, I think it makes sense to put this in
    prefect.context
    . For the time being, the workaround is to query the Prefect API to get the name. Please note this will only work when running against a backend This code example is working for me
    from prefect import Flow, task, Task
    import prefect
    from prefect.utilities.graphql import with_args
    from prefect.tasks.shell import ShellTask
    from datetime import timedelta
    
    
    def taskStateHandler(obj: Task, old_state, new_state):
        logger = prefect.context.get("logger")
        # you may need to briefly sleep here to make sure the task has been renamed, otherwise
        # the result may come back as 'None'
        import time
        time.sleep(10)
    
        # use the task run id to get task_run_name
        client = prefect.Client()
        task_run_id = prefect.context.get("task_run_id")
        task_run_name_info = client.graphql(
                    {
                        "query": {
                            with_args("task_run_by_pk", {"id": task_run_id}): {"name"}
                        }
                    }
                )
        # log using the task run name
        <http://logger.info|logger.info>("Task {0} testing".format(task_run_name_info.data.task_run_by_pk.name))
        return new_state
    
    
    executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split(' ')}",
        stream_output=True, max_retries=2, retry_delay=timedelta(seconds=30) ,state_handlers=[taskStateHandler])
    
    with Flow("shell command task name") as flow:
        executeShell(command = "echo 'test'")
    
    flow.register("test")