~hello, hello, had a question around getting the f...
# ask-community
p
hello, hello, had a question around getting the flow name with
FlowRunContext.get().flow.name
in a Prefect flow that uses
DaskTaskRunner
with mapped tasks. If I just use
DaskTaskRunner
without mapping task, I can retrieve the flow name, but if I try to map a task, then I get
AttributeError: 'NoneType' object has no attribute 'flow'
EDIT: got a suggestion
sample code:
Copy code
from prefect import flow, task
from prefect.context import FlowRunContext
from prefect_dask.task_runners import DaskTaskRunner

MAPPING = {"a": 1, "b": 2}

@task(name="Mapped task")
def mapped_task(value:str):
    print(f"flow name is: {FlowRunContext.get().flow.name}")

@flow(name="test flow",
    #   task_runner=DaskTaskRunner(
    #     cluster_kwargs={"n_workers": 1, "threads_per_worker": 2})
)
def test_flow():
    mapped_task.map(value=list(MAPPING.keys()))

if __name__ == "__main__":
    test_flow()
commenting out the
DaskTaskRunner
within the
flow
decorator will cause the flow to run successfully
and having it in gives this error
Copy code
13:42:14.758 | ERROR   | Flow run 'talented-nightingale' - Finished in state Failed('2/2 states failed.')
Traceback (most recent call last):
  File "/Users/preyna/prog/ccde-batch-dataflows/flows/src/test_mapping.py", line 20, in <module>
    test_flow()
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/flows.py", line 1120, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 291, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/Users/preyna/.pyenv/versions/3.10.13/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 394, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 2055, in orchestrate_task_run
    result = await call.aresult()
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/preyna/Library/Caches/pypoetry/virtualenvs/flows-x5humnCY-py3.10/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 318, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/preyna/prog/ccde-batch-dataflows/flows/src/test_mapping.py", line 9, in mapped_task
    print(f"flow name is: {FlowRunContext.get().flow.name}")
AttributeError: 'NoneType' object has no attribute 'flow'
as a side note, using
DaskTaskRunner
, but without mapping tasks also runs successfully
looks like https://prefect-community.slack.com/archives/C04DZJC94DC/p1706126662825379 should get me going, probably going to end up passing the flow info along with
get_run_context
or something
Note that the flow run context is not sent to distributed task workers by default because it is costly to serialize and deserialize. If you need to access the flow name in a distributed environment, such as when using the
DaskTaskRunner
, you would need to ensure that the relevant context information is passed to the task workers.