I have a mapped task and wanted to set the task na...
# ask-community
n
I have a mapped task and wanted to set the task name dynamically with
Copy code
@task(task_run_name=name_fn)
where name_fn() dynamically generates the task name form the kwargs that are passed to it. This all works great when I'm running with the UI. When I run directly by calling flow.run() the
set_task_run_name()
in engine/task_runner.py is stubbed out and doesn't call my name_fn(). I can see that in TaskRunner.run() getting the call to
set_task_run_name()
isn't totally straight forward. What would it take to get
set_task_run_name()
to work when running directly without the UI?
k
Hi @Nathan Atkins! Can you show me what
name_fn
looks like?
n
Copy code
def mapped_dataset_task_name(**kwargs):
    name = kwargs["task_name"]
    dataset = kwargs["csvx"].source.rsplit(":", 1)[1].split("_")[0]
    mapped_name = f"{name}[{dataset}]"
    return mapped_name
That works great. Super happy with what I can do there. In
<https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/cloud/task_runner.py>
Copy code
def set_task_run_name(self, task_inputs: Dict[str, Result]) -> None:
        """
        Sets the name for this task run by calling the `set_task_run_name` mutation.
        Args:
            - task_inputs (Dict[str, Result]): a dictionary of inputs whose keys correspond
                to the task's `run()` arguments.
        """
        task_run_name = self.task.task_run_name

        if task_run_name:
            raw_inputs = {k: r.value for k, r in task_inputs.items()}
            formatting_kwargs = {
                **prefect.context.get("parameters", {}),
                **prefect.context,
                **raw_inputs,
            }

            if not isinstance(task_run_name, str):
                task_run_name = task_run_name(**formatting_kwargs)
            else:
                task_run_name = task_run_name.format(**formatting_kwargs)

            self.client.set_task_run_name(
                task_run_id=self.task_run_id, name=task_run_name  # type: ignore
            )
In
<https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py>
Copy code
def set_task_run_name(self, task_inputs: Dict[str, Result]) -> None:
        """
        Sets the name for this task run.
        Args:
            - task_inputs (Dict[str, Result]): a dictionary of inputs whose keys correspond
                to the task's `run()` arguments.
        """
        pass
So TaskRunner.run() never calls my function.
set_task_run_name()
is also called late in
run()
so it won't be set before the
Starting Task ...
message.
k
I see. Did you look into templating names ?
n
Yes, it is the same call. In the cloud version
Copy code
if not isinstance(task_run_name, str):
                task_run_name = task_run_name(**formatting_kwargs)
            else:
                task_run_name = task_run_name.format(**formatting_kwargs)
figures out if it should just to a straight string substitution or call the function. Took a bit to find that, but super powerful.
k
Ah ok I see the edit to the original question. Let me look into it.
n
👍
k
What is your use case for naming the task in
flow.run()
? The task name is used to identify the run in the backend/UI, so it’s not something intended for local use.
Is this for logging purposes?
n
When one of the mapped tasks fail there isn't any visibility to which mapped value failed. I can put logging messages in to show which task is running, but it would be nice to have the same process for running with and without the UI. I'm trying to get a client to move towards Prefect by running the Prefect workflow from their Airflow task. It is tough if we can't figure out which of 150 mapped values is the cause of the fail. I was modeling after this. https://docs.prefect.io/core/idioms/task-run-names.html
I do see that the cloud.TaskRunner doesn't actually update it in the context or the task, but just sets the name in the client.
k
I know what you mean. Unfortunately, the task name is not used in the logger because the logger is created without some of the templating information necessary. What do you think of a custom failure message that includes the inputs into the task?
n
TaskRunner.run()
Copy code
<http://self.logger.info|self.logger.info>(
                    "Task '{name}': Finished task run for task with final state: "
                    "'{state}'".format(
                        name=context["task_full_name"], state=type(state).__name__
                    )
                )
If we could get the context updated from the
set_task_run_name()
then it would be updated in the log messages. The tricky one is the first
Starting Task
log message that happens before
set_task_run_name()
is called. New information for you. There are times that it is nice to see the name even when the task completes successfully. It helps understand which tasks are taking a long time or how Dask is scheduling.
k
Hey @Nathan Atkins, sorry about the delayed response. Unfortunately, this is not a use case we support because Cloud is intended to do that. The best advice I can give for this situation is wrapping the code with
try-except
statements that raise either a
SUCCESS
or
FAILED
state with the custom error message.
n
Thanks for looking into it Kevin. I have it covered with logging currently.
k
Glad you have it covered. Sorry about that.
n
I'll spend sometime looking at the @task decorator to see what I can do. It seems like it would be good to be able to get a name other than fn_name[idx] in the logs for both cloud and server.