Paul Reyna
01/24/2024, 7:44 PMFlowRunContext.get().flow.name
in a Prefect flow that uses DaskTaskRunner
with mapped tasks. If I just use DaskTaskRunner
without mapping task, I can retrieve the flow name, but if I try to map a task, then I get AttributeError: 'NoneType' object has no attribute 'flow'
Paul Reyna
01/24/2024, 7:45 PMfrom prefect import flow, task
from prefect.context import FlowRunContext
from prefect_dask.task_runners import DaskTaskRunner
MAPPING = {"a": 1, "b": 2}
@task(name="Mapped task")
def mapped_task(value:str):
print(f"flow name is: {FlowRunContext.get().flow.name}")
@flow(name="test flow",
# task_runner=DaskTaskRunner(
# cluster_kwargs={"n_workers": 1, "threads_per_worker": 2})
)
def test_flow():
mapped_task.map(value=list(MAPPING.keys()))
if __name__ == "__main__":
test_flow()
commenting out the DaskTaskRunner
within the flow
decorator will cause the flow to run successfullyPaul Reyna
01/24/2024, 7:46 PM13:42:14.758 | ERROR | Flow run 'talented-nightingale' - Finished in state Failed('2/2 states failed.')
Traceback (most recent call last):
File "/Users/preyna/prog/ccde-batch-dataflows/flows/src/test_mapping.py", line 20, in <module>
test_flow()
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 1120, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
return call.result()
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
return self.future.result(timeout=timeout)
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
return self.__get_result()
File "/Users/preyna/.pyenv/versions/3.10.13/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
result = await coro
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 394, in create_then_begin_flow_run
return await state.result(fetch=True)
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 2055, in orchestrate_task_run
result = await call.aresult()
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
return await asyncio.wrap_future(self.future)
File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/Users/preyna/prog/ccde-batch-dataflows/flows/src/test_mapping.py", line 9, in mapped_task
print(f"flow name is: {FlowRunContext.get().flow.name}")
AttributeError: 'NoneType' object has no attribute 'flow'
Paul Reyna
01/24/2024, 7:48 PMDaskTaskRunner
, but without mapping tasks also runs successfullyPaul Reyna
01/24/2024, 8:34 PMget_run_context
or something
Note that the flow run context is not sent to distributed task workers by default because it is costly to serialize and deserialize. If you need to access the flow name in a distributed environment, such as when using the, you would need to ensure that the relevant context information is passed to the task workers.DaskTaskRunner