Hey folks! I’d like to use python’s dataclasses an...
# prefect-community
m
Hey folks! I’d like to use python’s dataclasses and yamldataclassconfig packages to read job configurations from a yaml file. The code I have works fine outside of the context of a task, but when I wrap the code in a task I run into what looks like a fairly obscure error (in the thread). The implementation of the code is straightforward. I have a dataclass (partial version for illustration):
Copy code
@dataclass
class Config(YamlDataClassConfig):
    EMAIL_FROM: str = None
    EMAIL_TO: List[str] = None
as well as a task intended to instantiate the instance of the class and load data to it:
Copy code
@task(name='load-configs')
def load_configs(config_path: str) -> Config:
    configs = Config()
    configs.load(config_path)
    return configs
The task seems to instantiate the config object, but breaks when we try to actually load configs from the provided path. Again, this works perfectly fine outside of the context of a task, so presumably there is some internal prefect logic I’m futzing with here. I’d appreciate any thoughts!
1
k
Hi @Mary Clair Thompson, thanks for the issue. Do you mind condensing the error into the comment to make sure the channel is clean and easy to follow?
I’m testing out your code
Where did you install YamlDataClassConfig from?
a
@Mary Clair Thompson can you please move the code blocks into thread?
m
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/engine.py", line 1221, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "cyberrisk_grouper_write.py", line 106, in load_configs
    CONFIGS.load(config_path)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/yamldataclassconfig/config.py", line 35, in load
    self.__dict__.update(self.__class__.schema().load(dictionary_config).__dict__)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 719, in load
    return self._do_load(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 892, in _do_load
    result = self._invoke_load_processors(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 1090, in _invoke_load_processors
    data = self._invoke_processors(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 1220, in _invoke_processors
    data = processor(data, many=many, **kwargs)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/dataclasses_json/mm.py", line 334, in make_instance
    return _decode_dataclass(cls, kvs, partial)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/dataclasses_json/core.py", line 152, in _decode_dataclass
    types = get_type_hints(cls)
  File "/srv/prefect/anaconda3/lib/python3.8/typing.py", line 1223, in get_type_hints
    base_globals = sys.modules[base.__module__].__dict__
KeyError: '__prefect_loader__'11:23:40 AMload-configs-f3cf34f7-0
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "cyberrisk_grouper_write.py", line 114, in cyberrisk_flow
    ORACLE_CREDS_PATH, ORACLE_TABLE_DTYPE, ORACLE_TABLE_NAME, ORACLE_URL, ORACLE_SCHEMA, ORACLE_WRITE_MODE, = load_configs(config_path)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/tasks.py", line 295, in __call__
    return enter_task_run_engine(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/engine.py", line 736, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/from_thread.py", line 35, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 847, in run_async_from_thread
    return f.result()
  File "/srv/prefect/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/srv/prefect/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/engine.py", line 1221, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
    return await future
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 754, in run
    result = context.run(func, *args)
  File "cyberrisk_grouper_write.py", line 106, in load_configs
    CONFIGS.load(config_path)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/yamldataclassconfig/config.py", line 35, in load
    self.__dict__.update(self.__class__.schema().load(dictionary_config).__dict__)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 719, in load
    return self._do_load(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 892, in _do_load
    result = self._invoke_load_processors(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 1090, in _invoke_load_processors
    data = self._invoke_processors(
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/marshmallow/schema.py", line 1220, in _invoke_processors
    data = processor(data, many=many, **kwargs)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/dataclasses_json/mm.py", line 334, in make_instance
    return _decode_dataclass(cls, kvs, partial)
  File "/srv/prefect/anaconda3/lib/python3.8/site-packages/dataclasses_json/core.py", line 152, in _decode_dataclass
    types = get_type_hints(cls)
  File "/srv/prefect/anaconda3/lib/python3.8/typing.py", line 1223, in get_type_hints
    base_globals = sys.modules[base.__module__].__dict__
KeyError: '__prefect_loader__'
@Khuyen Tran just from pypi
Interestingly, I don’t get any such error when loading configs via pydantic/pydantic_yaml. I may just switch to using these
👍 1
k
yeah that’s probably the best
209 Views