https://prefect.io logo
Title
o

Ouail Bendidi

04/05/2023, 4:44 PM
Hey everyone, I'm getting some weird behavior between running a prefect flow locally and running on
DockerContainer
infra on a prefect agent. So I'm trying to pass an instance of an object as an argument to my first task, this object happens to be a dataclass but more complex. Locally it works fine, but on the prefect agent, the engine tries to resolve my object and create a new instance of it before passing it to the task !? I'm on prefect 2.9.0 and python3.11 (details about the error in the thread ⬇️)
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.11/site-packages/prefect/task_runners.py", line 207, in submit
    result = await call()
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1369, in begin_task_run
    state = await orchestrate_task_run(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1459, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1852, in resolve_inputs
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 331, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 331, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
                               ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 273, in visit_nested
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 337, in visit_collection
    result = typ(**items) if return_data else None
             ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kili/client.py", line 104, in __init__
    raise AuthenticationFailed(api_key, api_endpoint)
To reproduce:
from kili.client import Kili
from prefect import task, flow

@task
def my_task(*, kili: Kili):
  return kili.users()

@flow
async def my_flow():
  kili = Kili(api_key="xxxx")
  result = my_task(kili=kili)
z

Zanie

04/05/2023, 4:51 PM
Is that object a dataclass or pydantic model?
o

Ouail Bendidi

04/05/2023, 4:51 PM
Yes, it's an instance of a dataclass
z

Zanie

04/05/2023, 4:51 PM
from prefect.utilities.annotations import quote; my_task(kili=quote(kili))
If you quote it we will not look for Prefect futures in it
Otherwise yeah we traverse dataclasses to resolve futures by default
o

Ouail Bendidi

04/05/2023, 4:53 PM
any idea why it's not the same behavior when running the flow locally ?
z

Zanie

04/05/2023, 4:55 PM
No that part is really weird.
Presumably some sort of detail of the error itself. We’re almost certainly traversing the object in the same way in both cases.
Different Prefect versions?
o

Ouail Bendidi

04/05/2023, 4:56 PM
nope, exactly the same (I use poetry to pin versions)
using
quote
makes the
task.map
fail:
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1030, in begin_task_map
    raise MappingLengthMismatch(
prefect.exceptions.MappingLengthMismatch: Received iterable parameters with different lengths. Parameters for map must all be the same length. Got lengths: {'kili': 1, 'project': 10}
I should probably use
unmapped
? Example code:
from kili.client import Kili
from prefect import task, flow
from prefect.utilities.annotations import quote

@task
def my_task(*, kili: Kili) -> list[dict]:
  return kili.users()

def my_other_task(*, kili: Kili, user: dict):
  print(user)
  return user["id"]

@flow
async def my_flow():
  kili = Kili(api_key="xxxx")
  users = my_task(kili=quote(kili))
  my_other_task.map(kili=quote(kili), user=users)
(this time it fails both locally and on the agent^^)
z

Zanie

04/05/2023, 5:08 PM
Oh yeah I guess you need
unmapped
now
That’s a side effect of
quote
inheriting from tuple for serialization with Dask
We should make it
unmapped
by default
o

Ouail Bendidi

04/05/2023, 5:21 PM
Cool, thanks for the fix, I'll be implementing a wrapper for now, cause it can be a hassle to keep track of all instances of
kili
in all pipelines and add
quote
and
unmapped
everywhere (because for some reason kili decided to make their client a dataclass 😓 )
class KiliWrapper:
    def __init__(self, kili: Kili) -> None:
        self._kili = kili

    def __getattr__(self, __name: str) -> tp.Any:
        return getattr(self._kili, __name)