Hi, is there a way to customize `task_run_name` bu...
# ask-community
t
Hi, is there a way to customize
task_run_name
but using auto-increment numbers? When I specify it, it lost the feature that I find instructions how to add back.
n
hi @Tony Yun - what about something like this?
Copy code
In [1]: from itertools import count

In [2]: c = count()

In [3]: from prefect import flow, task

In [4]: @task(task_run_name=lambda: f"this is run number {next(c)}")
   ...: def f():
   ...:     pass
   ...:

In [5]: @flow
   ...: def my_flow():
   ...:     f()
   ...:     f()
   ...:     f()
   ...:

In [6]: my_flow()
16:50:32.682 | INFO    | prefect.engine - Created flow run 'complex-walrus' for flow 'my-flow'
16:50:33.285 | INFO    | Task run 'this is run number 0' - Created task run 'this is run number 0' for task 'f'
16:50:33.593 | INFO    | Task run 'this is run number 0' - Finished in state Completed()
16:50:33.906 | INFO    | Task run 'this is run number 1' - Created task run 'this is run number 1' for task 'f'
16:50:34.250 | INFO    | Task run 'this is run number 1' - Finished in state Completed()
16:50:34.532 | INFO    | Task run 'this is run number 2' - Created task run 'this is run number 2' for task 'f'
16:50:34.838 | INFO    | Task run 'this is run number 2' - Finished in state Completed()
16:50:34.974 | INFO    | Flow run 'complex-walrus' - Finished in state Completed()
t
Thanks @Nate. But this seems not work for async future run loops
Copy code
_count = count()


@task(task_run_name=lambda: "route-fax-{config.line_number}-" + f"{next(_count)}")
def route_fax(...)
...

        route_task = route_fax.map(
            fax_objects,
            unmapped(flow_params.sort_strat),
            unmapped(config),
            unmapped(sfax_credentials),
            unmapped(flow_params.reprocess_failed),
        )

        for future in route_task:
            ...
Copy code
11:56:56.676 | ERROR   | Flow run 'fax-ocr-01/02-Tuesday' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 173, in fax_ocr_flow_batch
    fax_ocr_batch(
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 123, in fax_ocr_batch
    counter.increment(future.result())
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 228, in result
    return from_sync.call_soon_in_loop_thread(
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1530, in orchestrate_task_run
    task_run_name = task.task_run_name.format(**resolved_parameters)
AttributeError: 'function' object has no attribute 'format'
n
this works fine
Copy code
In [1]: from itertools import count

In [2]: from prefect import flow, task

In [3]: c = count()

In [4]: @task(task_run_name=lambda: f"this is run number {next(c)}")
   ...: def add(x, y):
   ...:     return x + y
   ...:

In [5]: from prefect import unmapped
   ...:
   ...: @flow
   ...: def adding():
   ...:     return [f.result() for f in add.map(range(10), unmapped(42))]
   ...:

In [6]: adding()
11:09:43.542 | INFO    | prefect.engine - Created flow run 'arboreal-pudu' for flow 'adding'
...
11:09:44.575 | INFO    | Task run 'this is run number 4' - Finished in state Completed()
11:09:44.576 | INFO    | Task run 'this is run number 5' - Finished in state Completed()
11:09:44.610 | INFO    | Task run 'this is run number 1' - Finished in state Completed()
11:09:44.611 | INFO    | Task run 'this is run number 7' - Finished in state Completed()
11:09:44.727 | INFO    | Task run 'this is run number 0' - Finished in state Completed()
11:09:44.728 | INFO    | Task run 'this is run number 2' - Finished in state Completed()
11:09:44.729 | INFO    | Task run 'this is run number 6' - Finished in state Completed()
11:09:44.732 | INFO    | Task run 'this is run number 8' - Finished in state Completed()
11:09:44.734 | INFO    | Task run 'this is run number 3' - Finished in state Completed()
11:09:44.887 | INFO    | Task run 'this is run number 9' - Finished in state Completed()
11:09:45.017 | INFO    | Flow run 'arboreal-pudu' - Finished in state Completed()
Out[6]: [42, 43, 44, 45, 46, 47, 48, 49, 50, 51]
in your example you're mixing a format string and a function, hence
AttributeError: 'function' object has no attribute 'format'
you have to choose between using a format string (not an f-string, but a format string) and a function
t
ok thanks! will try it out.
I copied paste your task_run_name
@task(task_run_name=lambda: f"this is run number {next(c)}")
but still see this error: I probably already know that maybe because our prefect version’s difference. I’m using 2.14 but I realized this might be too much for me to resolve.
Copy code
14:06:12.827 | ERROR   | Flow run 'fax-ocr-01/02-Tuesday' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 184, in fax_ocr_flow_batch_uat
    fax_ocr_batch(
  File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 127, in fax_ocr_batch
    counter.increment(future.result())
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 228, in result
    return from_sync.call_soon_in_loop_thread(
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1530, in orchestrate_task_run
    task_run_name = task.task_run_name.format(**resolved_parameters)
AttributeError: 'function' object has no attribute 'format'
n
oh interesting let me try an example on your version
hmm can you show the exact output of
prefect version
?
t
Copy code
$ prefect version                                                                                                                           ‹ruby-2.6.4›
Version:             2.10.2
API version:         0.8.4
Python version:      3.9.17
Git commit:          16747be2
Built:               Fri, Apr 7, 2023 10:19 AM
OS/Arch:             darwin/x86_64
Profile:             default
Server type:         cloud
(fax_ocr-3iE9cg3R)
*sorry it’s not 2.14 locally
Also, I tried to see how in the source code the number accumulation is handled but don’t find in github.
n
ah so we don't actually support using functions in 2.10.x you can only template in parameter values like this
Copy code
In [1]: from prefect import flow, task

In [2]: @task(task_run_name="got {value}")
   ...: def f(value: str):
   ...:     pass
   ...:

In [3]: @flow
   ...: def g():
   ...:     f(42)

13:27:22.813 | INFO    | Task run 'got 42' - Finished in state Completed()