[BUG] [Saving parquet] [Agents -> Workers] :lad...
# prefect-cloud
k
[BUG] [Saving parquet] [Agents -> Workers] 馃悶 Hi guys, how are you? We are migrating our application from agents to workers (prefect 2.20.2). If we use Dask with a version lower than 2024.3.0, our application don't even initiate (TypeError("descriptor '__call__' for 'type' objects doesn't apply to a 'property' object")) If we use latest versions of Dask, we can't save our parquet dataset: 'index' is not in list Could someone help us with this migration?
[Traceback] [Saving parquet] Note: we can save .csv, but parquet doesn't work Using latest versions of Dask: Traceback (most recent call last): File "/opt/prefect/flows/forge/domain/features/tasks/save_dataset.py", line 29, in save_dataset save_dataframe( File "/opt/prefect/flows/forge/domain/features/tasks/save_dataset.py", line 58, in save_dataframe df.to_parquet( File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_collection.py", line 3296, in to_parquet return to_parquet(self, path, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/io/parquet.py", line 594, in to_parquet df.to_legacy_dataframe(), ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_collection.py", line 1396, in to_legacy_dataframe df = self.optimize(**optimize_kwargs) if optimize else self ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_collection.py", line 591, in optimize return new_collection(self.expr.optimize(fuse=fuse)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_expr.py", line 94, in optimize return optimize(self, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_expr.py", line 3070, in optimize return optimize_until(expr, stage) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_expr.py", line 3021, in optimize_until expr = result.simplify() ^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 374, in simplify new = expr.simplify_once(dependents=dependents, simplified={}) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 352, in simplify_once new = operand.simplify_once( ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 352, in simplify_once new = operand.simplify_once( ^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 352, in simplify_once new = operand.simplify_once( ^^^^^^^^^^^^^^^^^^^^^^ [Previous line repeated 4 more times] File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 335, in simplify_once out = child._simplify_up(expr, dependents) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_categorical.py", line 184, in _simplify_up return Categorize(result.frame, cats, result.operand("index")) ^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/dask_expr/_core.py", line 176, in operand return self.operands[type(self)._parameters.index(key)] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ValueError: 'index' is not in list
[Traceback] Flow doesn't initiate Traceback (most recent call last): File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/engine.py", line 427, in retrieve_flow_then_begin_flow_run else await load_flow_from_flow_run( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/client/utilities.py", line 100, in with_injected_client return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/deployments/deployments.py", line 322, in load_flow_from_flow_run flow = await run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 138, in run_sync_in_worker_thread return await anyio.to_thread.run_sync( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync return await get_async_backend().run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2177, in run_sync_in_worker_thread return await future ^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 859, in run result = context.run(func, *args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/flows.py", line 1683, in load_flow_from_entrypoint flow = import_object(entrypoint) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/utilities/importtools.py", line 205, in import_object module = load_script_as_module(script_path) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/conda/envs/forge/lib/python3.11/site-packages/prefect/utilities/importtools.py", line 168, in load_script_as_module raise ScriptError(user_exc=exc, path=path) from exc prefect.exceptions.ScriptError: Script at 'forge/pipelines/models/materialize_dataset_flow.py' encountered an exception: TypeError("descriptor '__call__' for 'type' objects doesn't apply to a 'property' object")
c
Hey Kamilly - both of these issues appear to be dask issues, which means your best bet will be to open a bug report on the relevant dask repository. I found the descriptor issue for dask here, and you might want to open the
'index'
issue on the `dask_expr` repository here
k
thank you, Chris!! 馃挅
馃 1