Tony Yun
06/26/2024, 9:01 PMtask_run_name
but using auto-increment numbers? When I specify it, it lost the feature that I find instructions how to add back.Nate
06/26/2024, 9:51 PMIn [1]: from itertools import count
In [2]: c = count()
In [3]: from prefect import flow, task
In [4]: @task(task_run_name=lambda: f"this is run number {next(c)}")
...: def f():
...: pass
...:
In [5]: @flow
...: def my_flow():
...: f()
...: f()
...: f()
...:
In [6]: my_flow()
16:50:32.682 | INFO | prefect.engine - Created flow run 'complex-walrus' for flow 'my-flow'
16:50:33.285 | INFO | Task run 'this is run number 0' - Created task run 'this is run number 0' for task 'f'
16:50:33.593 | INFO | Task run 'this is run number 0' - Finished in state Completed()
16:50:33.906 | INFO | Task run 'this is run number 1' - Created task run 'this is run number 1' for task 'f'
16:50:34.250 | INFO | Task run 'this is run number 1' - Finished in state Completed()
16:50:34.532 | INFO | Task run 'this is run number 2' - Created task run 'this is run number 2' for task 'f'
16:50:34.838 | INFO | Task run 'this is run number 2' - Finished in state Completed()
16:50:34.974 | INFO | Flow run 'complex-walrus' - Finished in state Completed()
Tony Yun
06/27/2024, 3:56 PM_count = count()
@task(task_run_name=lambda: "route-fax-{config.line_number}-" + f"{next(_count)}")
def route_fax(...)
...
route_task = route_fax.map(
fax_objects,
unmapped(flow_params.sort_strat),
unmapped(config),
unmapped(sfax_credentials),
unmapped(flow_params.reprocess_failed),
)
for future in route_task:
...
Tony Yun
06/27/2024, 3:58 PM11:56:56.676 | ERROR | Flow run 'fax-ocr-01/02-Tuesday' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 173, in fax_ocr_flow_batch
fax_ocr_batch(
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 123, in fax_ocr_batch
counter.increment(future.result())
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 228, in result
return from_sync.call_soon_in_loop_thread(
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1530, in orchestrate_task_run
task_run_name = task.task_run_name.format(**resolved_parameters)
AttributeError: 'function' object has no attribute 'format'
Nate
06/27/2024, 4:11 PMIn [1]: from itertools import count
In [2]: from prefect import flow, task
In [3]: c = count()
In [4]: @task(task_run_name=lambda: f"this is run number {next(c)}")
...: def add(x, y):
...: return x + y
...:
In [5]: from prefect import unmapped
...:
...: @flow
...: def adding():
...: return [f.result() for f in add.map(range(10), unmapped(42))]
...:
In [6]: adding()
11:09:43.542 | INFO | prefect.engine - Created flow run 'arboreal-pudu' for flow 'adding'
...
11:09:44.575 | INFO | Task run 'this is run number 4' - Finished in state Completed()
11:09:44.576 | INFO | Task run 'this is run number 5' - Finished in state Completed()
11:09:44.610 | INFO | Task run 'this is run number 1' - Finished in state Completed()
11:09:44.611 | INFO | Task run 'this is run number 7' - Finished in state Completed()
11:09:44.727 | INFO | Task run 'this is run number 0' - Finished in state Completed()
11:09:44.728 | INFO | Task run 'this is run number 2' - Finished in state Completed()
11:09:44.729 | INFO | Task run 'this is run number 6' - Finished in state Completed()
11:09:44.732 | INFO | Task run 'this is run number 8' - Finished in state Completed()
11:09:44.734 | INFO | Task run 'this is run number 3' - Finished in state Completed()
11:09:44.887 | INFO | Task run 'this is run number 9' - Finished in state Completed()
11:09:45.017 | INFO | Flow run 'arboreal-pudu' - Finished in state Completed()
Out[6]: [42, 43, 44, 45, 46, 47, 48, 49, 50, 51]
in your example you're mixing a format string and a function, hence
AttributeError: 'function' object has no attribute 'format'you have to choose between using a format string (not an f-string, but a format string) and a function
Tony Yun
06/27/2024, 4:56 PMTony Yun
06/28/2024, 6:08 PM@task(task_run_name=lambda: f"this is run number {next(c)}")
but still see this error: I probably already know that maybe because our prefect version’s difference. I’m using 2.14 but I realized this might be too much for me to resolve.
14:06:12.827 | ERROR | Flow run 'fax-ocr-01/02-Tuesday' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 184, in fax_ocr_flow_batch_uat
fax_ocr_batch(
File "/Users/tonyyun/github/data-operations-v2/fax_ocr/flows/flow_batch_earth.py", line 127, in fax_ocr_batch
counter.increment(future.result())
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 228, in result
return from_sync.call_soon_in_loop_thread(
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.17/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/tonyyun/.local/share/virtualenvs/fax_ocr-3iE9cg3R/lib/python3.9/site-packages/prefect/engine.py", line 1530, in orchestrate_task_run
task_run_name = task.task_run_name.format(**resolved_parameters)
AttributeError: 'function' object has no attribute 'format'
Nate
06/28/2024, 6:08 PMNate
06/28/2024, 6:13 PMprefect version
?Tony Yun
06/28/2024, 6:13 PM$ prefect version ‹ruby-2.6.4›
Version: 2.10.2
API version: 0.8.4
Python version: 3.9.17
Git commit: 16747be2
Built: Fri, Apr 7, 2023 10:19 AM
OS/Arch: darwin/x86_64
Profile: default
Server type: cloud
(fax_ocr-3iE9cg3R)
Tony Yun
06/28/2024, 6:13 PMTony Yun
06/28/2024, 6:18 PMNate
06/28/2024, 6:27 PMIn [1]: from prefect import flow, task
In [2]: @task(task_run_name="got {value}")
...: def f(value: str):
...: pass
...:
In [3]: @flow
...: def g():
...: f(42)
13:27:22.813 | INFO | Task run 'got 42' - Finished in state Completed()