Prefect 2.6.8 -> I'm getting `AttributeError: '...
# prefect-community
j
Prefect 2.6.8 -> I'm getting
AttributeError: 'unmapped' object has no attribute 'get'
when using a
.map()
on a task. I'm passing a
dict
. Upstream flows that unmap this object does not throw any error. Any thoughts on what a solution could be?
n
Hi @Joshua Grant! Could you share the code where you're doing this?
j
The code is quite complicated, but in pseudo-code:
Copy code
@flow(name='flow-name', task_runner=DaskTaskRunner())
def my_flow(parameters: dict, context: dict):
   get_file_ids = get_file_ids_from_svc(
          group_file_id=parameters.get('group_file_id'))
   create_entries = make_entries.map(
      file_id=get_file_ids,
      context=unmapped(context),  # this works
)
    update_service = update_servic_call.map(
       entry=create_entries,
       context=unmapped(context), # does not work
)
n
hmm, can you share the signature (args with types) for the task that fails to map?
j
Copy code
update_servic_call(entry: str, context: dict) -> dict:
🙏 1
👀 1
This is a simplified version, we use
unmapped(context)
in other tasks before it hits this terminal task
n
hmm, does this example (which seems to work) differ from yours? (also tried with dask in case that was related, doesnt seem to be)
Copy code
@task(log_prints=True)
def update(entry: str, context: dict) -> dict:
   print(entry, context)

@flow
def my_flow():
   context = {"a": 1, "b": 2}
   entries = [c for c in 'abcde']
   update.map(entry=entries, context=unmapped(context))
j
not exactly, context is not modified at all
n
sorry, I'm not sure I understand what you mean
j
the only difference I can tell is that the context is passed as a parameter from to the flow
sorry, had another fire pop up. I'm getting `task() got an unexpected keyword argument 'log_prints'
n
ahh, yeah I can't imagine where
context
comes from would matter so much as long its a valid dict ``task() got an unexpected keyword argument 'log_prints'` I would guess this is a prefect version thing, what does
prefect version
show?
j
Copy code
Version:             2.6.8
API version:         0.8.3
Python version:      3.9.16
Git commit:          68044e28
Built:               Thu, Nov 17, 2022 3:19 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         <client error>
This is local
n
j
yeah, we were running 2.7.3, but had to downgrade due to an issue I have since forgotten
This is probably more helpful:
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.9/site-packages/platform_flow_common/libs/error_handling.py", line 154, in wrapper
    return wrapped(*args, **kwargs_minus_handler)
  File "/opt/bitnami/python/lib/python3.9/site-packages/platform_flow_common/tasks/load/rgi/file_media_types.py", line 24, in load_file_media_type_to_rgi
    load_file_media_type(file_id=file_id, media_type=media_type, context=context)
  File "/opt/bitnami/python/lib/python3.9/site-packages/platform_flow_common/tasks/load/rgi/file_media_types.py", line 39, in load_file_media_type
    response = make_rgi_request(
  File "/opt/bitnami/python/lib/python3.9/site-packages/platform_flow_common/libs/request.py", line 51, in make_rgi_request
    organization_id = context.get("organization_id")
AttributeError: 'unmapped' object has no attribute 'get'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flow_file.py", line 212, in preliminary_flow
    new_submitted_file_media_type = wrapped_load_file_media_type_to_rgi(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/bitnami/python/lib/python3.9/site-packages/platform_flow_common/libs/error_handling.py", line 171, in wrapper
    raise result from ex
platform_flow_common.exceptions.error_handling.AuditLevelException: {'error_code': 'ERR_0200', 'error_msg': "'unmapped' object has no attribute 'get'"}
I see references to another var
context
unrelated to the one we are passing
n
ahhh have you subclassed
task
? or maybe added your own decorator that wraps it?
j
decorator that wraps it, yes, an
ErrorWrapper
that catches exceptions and logs them to another system
oddly though, all other tasks (~45) before this one do the same thing with no issues
so how we do it:
Copy code
from error_wrappers import ErrorWrapper
from tasks.this_task import special_task

error_wrapper = ErrorWrapper()

wrapped_special_task = error_wrapper(special_task)
n
yes that is a bit odd, it seems that one way or another, inside the scope of
update_servic_call
,
context
is being interpreted as a literal
unmapped
object instead of a dict 🤔
j
Did you see anything that stood out in the stack trace?
Here is the relevant portion of the flow:
Copy code
new_submitted_file_media_type = wrapped_load_file_media_type_to_rgi(
        file_id=file_record.get('id'),
        media_type=submitted_file_media_type,
        context=unmapped(context),
        wait_for=[new_s3_files],
    )
n
wait, should that be
Copy code
new_submitted_file_media_type = wrapped_load_file_media_type_to_rgi.map( # <-----
        file_id=file_record.get('id'),
        media_type=submitted_file_media_type,
        context=unmapped(context),
        wait_for=[new_s3_files],
    )
j
lol, yep, and I approved this PR facepalm
n
😄 it happens
🌮 1
j
Thanks @Nate
n
sure thing! anytime