Hey all, we attempted to run a prefect flow that t...
# ask-community
r
Hey all, we attempted to run a prefect flow that triggers subflows on an EC2 instance with a
DaskTaskRunner
but got the following error:
Copy code
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `ser_model`: AttributeError: 'NullFileSystem' object has no attribute 'get_block_type_slug'
What could that cryptic error message mean?
n
hi @Robin - any chance you have the full stack trace? this looks like a pydantic error related to result_storage
r
OK, we reconfigured a flow to run with DaskTaskRunner, so maybe some of the tasks attempt to store a result which is not serializable? Is there a way to tell prefect/dask to calculate "depth first" and not store results between task runs but to just keep them in memory, unless instructed to store results explicitly?
n
do you have an example / pseudo code of what you mean? cache_policy=NONE might be what you want since all tasks inherit settings by default but can opt out w kwargs in their own decorator
r
Where/How to set
cache_policy=NONE
?
one sec, let me switch machines so I can give you some more code as context 👍
r
Can I globally change the default?
Ah, just found the below. Doesn't that mean that caching is turned off by default?
n
that setting being off doesn't mean that caching is necessarily turned off it just that if its on then all results (where tasks do not explicitly opt out) are persisted
r
OK, thanks!
Will add the
cache_policy=None
in every task and see if that helps
n
just to avoid any confusion, it is an importable constant value
NONE
not the python builtin
None
(though I can see how this would be confusing and we should probably accept either to mean the same thing)
💡 1
r
pseudo code
Copy code
@flow(task_runner=dask)
def flow_a():

    for id in ids:
        flow_run_task.submit()

@task(cache_policy=None)
flow_run_task(kwargs):
    flow_b(kwargs)

@flow(task_runner=not_dask)
def flow_b():
    do_something()   
    do_something_else()

@task()
def do_something():
    ...

@task()
def do_something_else()
    ...
just to avoid any confusion, it is an importable constant value
NONE
not the python builtin
None
(though I can see how this would be confusing and we should probably accept either to mean the same thing)
That explains why my linter is complaining 🙈
Where to import
NONE
from? Seems to miss in the docs 🤔
from prefect.cache_policies import NONE
?
n
yep that's right
r
Seems like
cache_policy=None
did not prohibit the error 🤔
What surprises me is that the same code runs locally on my macbook without issues but fails when running on EC2 or ECS, but only when running with DaskTaskRunner. So it must be something related to the creation of (temporary) files related to dask, right?
I may have some time tomorrow to create a Minimal Reproducible Example
The inputs of the flows and tasks are mostly floats, integers, booleans, str, pd.Dataframes, dict, and datetime.dates
Here is the more complete stack trace:
Copy code
18:51:24.455 | ERROR   | Flow run 'polar-basilisk' - Finished in state Failed("Flow run encountered an exception: PydanticSerializationError: Error calling function `ser_model`: AttributeError: 'NullFileSystem' object has no attribute 'get_block_type_slug'")
Traceback (most recent call last):
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/blocks/core.py", line 354, in ser_model
    "block_type_slug": self.get_block_type_slug(),
                       ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/pydantic/main.py", line 811, in __getattr__
    raise AttributeError(f'{type(self).__name__!r} object has no attribute {item!r}')
AttributeError: 'NullFileSystem' object has no attribute 'get_block_type_slug'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/code/.../flow.py", line 104, in <module>
    run_emulations(
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flows.py", line 1346, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 1493, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 1373, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 347, in result
    raise self._raised
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 759, in run_context
    yield self
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 1371, in run_flow_sync
    engine.call_flow_fn()
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/flow_engine.py", line 782, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.../flow.py", line 87, in run_emulations
    run_emulation.submit(
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/tasks.py", line 1179, in submit
    future = task_runner.submit(self, parameters, wait_for)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
    future = self._client.submit(
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect_dask/client.py", line 35, in submit
    run_task_kwargs["context"] = serialize_context()
                                 ^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/context.py", line 78, in serialize_context
    "flow_run_context": flow_run_context.serialize() if flow_run_context else {},
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/prefect/context.py", line 373, in serialize
    return self.model_dump(
           ^^^^^^^^^^^^^^^^
  File "/home/ubuntu/code/core/.venv/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
    return self.__pydantic_serializer__.to_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `ser_model`: AttributeError: 'NullFileSystem' object has no attribute 'get_block_type_slug'
⚠️ if you suddenly get
AttributeError... NullFileSystem
downgrade prefect to
prefect==3.1.5
⚠️ The above error is now also raised locally. I changed back to
prefect==3.1.5
and the error disappears. I suggest you add an integration test to run a simplified flow like above. I may have the time later today to implement a PR that contains the failing flows as minimal reproducible example
Do you have a pointer on where to best add that test?
n
hi thanks @Robin - we’ll take a look at this asap
j
getting this same issue - is there any fix for this yet?
n
hi @Josiah Reeves - are you seeing this on 3.1.12?
j
yes, i've downgraded to 3.1.5 for now
n
if you have an MRE to share, that would be appreciated!