Nathan Atkins
05/03/2021, 6:04 PM@task(task_run_name=name_fn)
where name_fn() dynamically generates the task name form the kwargs that are passed to it.
This all works great when I'm running with the UI. When I run directly by calling flow.run() the set_task_run_name()
in engine/task_runner.py is stubbed out and doesn't call my name_fn().
I can see that in TaskRunner.run() getting the call to set_task_run_name()
isn't totally straight forward.
What would it take to get set_task_run_name()
to work when running directly without the UI?Kevin Kho
name_fn
looks like?Nathan Atkins
05/03/2021, 6:21 PMdef mapped_dataset_task_name(**kwargs):
name = kwargs["task_name"]
dataset = kwargs["csvx"].source.rsplit(":", 1)[1].split("_")[0]
mapped_name = f"{name}[{dataset}]"
return mapped_name
That works great. Super happy with what I can do there.
In <https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/cloud/task_runner.py>
def set_task_run_name(self, task_inputs: Dict[str, Result]) -> None:
"""
Sets the name for this task run by calling the `set_task_run_name` mutation.
Args:
- task_inputs (Dict[str, Result]): a dictionary of inputs whose keys correspond
to the task's `run()` arguments.
"""
task_run_name = self.task.task_run_name
if task_run_name:
raw_inputs = {k: r.value for k, r in task_inputs.items()}
formatting_kwargs = {
**prefect.context.get("parameters", {}),
**prefect.context,
**raw_inputs,
}
if not isinstance(task_run_name, str):
task_run_name = task_run_name(**formatting_kwargs)
else:
task_run_name = task_run_name.format(**formatting_kwargs)
self.client.set_task_run_name(
task_run_id=self.task_run_id, name=task_run_name # type: ignore
)
In <https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py>
def set_task_run_name(self, task_inputs: Dict[str, Result]) -> None:
"""
Sets the name for this task run.
Args:
- task_inputs (Dict[str, Result]): a dictionary of inputs whose keys correspond
to the task's `run()` arguments.
"""
pass
So TaskRunner.run() never calls my function. set_task_run_name()
is also called late in run()
so it won't be set before the Starting Task ...
message.Kevin Kho
Nathan Atkins
05/03/2021, 6:24 PMif not isinstance(task_run_name, str):
task_run_name = task_run_name(**formatting_kwargs)
else:
task_run_name = task_run_name.format(**formatting_kwargs)
figures out if it should just to a straight string substitution or call the function. Took a bit to find that, but super powerful.Kevin Kho
Nathan Atkins
05/03/2021, 6:27 PMKevin Kho
flow.run()
? The task name is used to identify the run in the backend/UI, so it’s not something intended for local use.Kevin Kho
Nathan Atkins
05/03/2021, 7:03 PMNathan Atkins
05/03/2021, 7:03 PMKevin Kho
Nathan Atkins
05/03/2021, 7:48 PM<http://self.logger.info|self.logger.info>(
"Task '{name}': Finished task run for task with final state: "
"'{state}'".format(
name=context["task_full_name"], state=type(state).__name__
)
)
If we could get the context updated from the set_task_run_name()
then it would be updated in the log messages. The tricky one is the first Starting Task
log message that happens before set_task_run_name()
is called.
New information for you. There are times that it is nice to see the name even when the task completes successfully. It helps understand which tasks are taking a long time or how Dask is scheduling.Kevin Kho
try-except
statements that raise either a SUCCESS
or FAILED
state with the custom error message.Nathan Atkins
05/04/2021, 2:50 PMKevin Kho
Nathan Atkins
05/04/2021, 2:53 PM