Ouail Bendidi
04/05/2023, 4:44 PMDockerContainer
infra on a prefect agent. So I'm trying to pass an instance of an object as an argument to my first task, this object happens to be a dataclass but more complex.
Locally it works fine, but on the prefect agent, the engine tries to resolve my object and create a new instance of it before passing it to the task !?
I'm on prefect 2.9.0 and python3.11 (details about the error in the thread ⬇️)File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/usr/local/lib/python3.11/site-packages/prefect/task_runners.py", line 207, in submit
result = await call()
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1369, in begin_task_run
state = await orchestrate_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1459, in orchestrate_task_run
resolved_parameters = await resolve_inputs(parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1852, in resolve_inputs
return visit_collection(
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 331, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 331, in <listcomp>
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 273, in visit_nested
return visit_collection(
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 337, in visit_collection
result = typ(**items) if return_data else None
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kili/client.py", line 104, in __init__
raise AuthenticationFailed(api_key, api_endpoint)
from kili.client import Kili
from prefect import task, flow
@task
def my_task(*, kili: Kili):
return kili.users()
@flow
async def my_flow():
kili = Kili(api_key="xxxx")
result = my_task(kili=kili)
Zanie
04/05/2023, 4:51 PMOuail Bendidi
04/05/2023, 4:51 PMZanie
04/05/2023, 4:51 PMfrom prefect.utilities.annotations import quote; my_task(kili=quote(kili))
Ouail Bendidi
04/05/2023, 4:53 PMZanie
04/05/2023, 4:55 PMOuail Bendidi
04/05/2023, 4:56 PMquote
makes the task.map
fail:
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1030, in begin_task_map
raise MappingLengthMismatch(
prefect.exceptions.MappingLengthMismatch: Received iterable parameters with different lengths. Parameters for map must all be the same length. Got lengths: {'kili': 1, 'project': 10}
I should probably use unmapped
?
Example code:
from kili.client import Kili
from prefect import task, flow
from prefect.utilities.annotations import quote
@task
def my_task(*, kili: Kili) -> list[dict]:
return kili.users()
def my_other_task(*, kili: Kili, user: dict):
print(user)
return user["id"]
@flow
async def my_flow():
kili = Kili(api_key="xxxx")
users = my_task(kili=quote(kili))
my_other_task.map(kili=quote(kili), user=users)
(this time it fails both locally and on the agent^^)Zanie
04/05/2023, 5:08 PMunmapped
nowquote
inheriting from tuple for serialization with Daskunmapped
by defaultOuail Bendidi
04/05/2023, 5:21 PMkili
in all pipelines and add quote
and unmapped
everywhere (because for some reason kili decided to make their client a dataclass 😓 )
class KiliWrapper:
def __init__(self, kili: Kili) -> None:
self._kili = kili
def __getattr__(self, __name: str) -> tp.Any:
return getattr(self._kili, __name)