Kamil Gorszczyk
06/20/2021, 3:23 PMexecuteShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
def taskStateHandler(obj: Task, old_state: State, new_state: State):
try:
if new_state.is_retrying():
sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
return new_state
except:
sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
return new_state
Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
Exception raised while calling state handlers: KeyError('command')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
new_state = super().call_runner_target_handlers(
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
new_state = handler(self.task, old_state, new_state) or new_state
File "prefect-master.py", line 200, in taskStateHandler
File "prefect-master.py", line 222, in <lambda>
KeyError: 'command'
I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.Kevin Kho
06/21/2021, 1:19 PMZach Angell
06/21/2021, 4:30 PMtask.task_run_name
is context dependent. We use this callable to set the task run name in the backend but never store the task_run_name
property in the Python code
I'll open up an issue on GitHub, I think it makes sense to put this in prefect.context
.
For the time being, the workaround is to query the Prefect API to get the name. Please note this will only work when running against a backend
This code example is working for me
from prefect import Flow, task, Task
import prefect
from prefect.utilities.graphql import with_args
from prefect.tasks.shell import ShellTask
from datetime import timedelta
def taskStateHandler(obj: Task, old_state, new_state):
logger = prefect.context.get("logger")
# you may need to briefly sleep here to make sure the task has been renamed, otherwise
# the result may come back as 'None'
import time
time.sleep(10)
# use the task run id to get task_run_name
client = prefect.Client()
task_run_id = prefect.context.get("task_run_id")
task_run_name_info = client.graphql(
{
"query": {
with_args("task_run_by_pk", {"id": task_run_id}): {"name"}
}
}
)
# log using the task run name
<http://logger.info|logger.info>("Task {0} testing".format(task_run_name_info.data.task_run_by_pk.name))
return new_state
executeShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split(' ')}",
stream_output=True, max_retries=2, retry_delay=timedelta(seconds=30) ,state_handlers=[taskStateHandler])
with Flow("shell command task name") as flow:
executeShell(command = "echo 'test'")
flow.register("test")