Can anyone explain to me why this raises an error?...
# prefect-community
h
Can anyone explain to me why this raises an error?
Copy code
import prefect
import sqlalchemy as sql


engine = sql.create_engine("<postgresql://tmp:tmp@localhost:5432/tmp>")
tmp_schema = sql.MetaData()
table = sql.Table("tmp", tmp_schema, sql.Column("tmp", sql.Integer))


@prefect.task
def create_table():
    tmp_schema.create_all(engine, tables=[table])


with prefect.Flow("tmp") as flow:
    create_table()


flow.storage = prefect.environments.storage.Docker(
    registry_url="localhost:5000",
    python_dependencies=["sqlalchemy"]
)
flow.register()
Raises
Copy code
Traceback (most recent call last):
  File "issue.py", line 23, in <module>
    flow.register()
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1437, in register
    registered_flow = client.register(
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 649, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/core/flow.py", line 1299, in serialize
    storage = self.storage.build()  # type: Optional[Storage]
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 293, in build
    self._build_image(push=push)
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 329, in _build_image
    dockerfile_path = self.create_dockerfile_object(directory=tempdir)
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 436, in create_dockerfile_object
    cloudpickle.dump(self._flows[flow_name], f)
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 48, in dump
    CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
  File "/Users/howardcornwell/files/dev/misc/prefect-pickle-issue/.venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 548, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread._local' object
z
Hi @Howard Cornwell, taking a look now! Will be back to you with an answer ASAP!
This looks like an issue serializing the sqlalchemy engine. I'm not super familiar with what's going on under the hood of the sqlalchemy engine, but moving the engine inside the
create_table
task seems to have resolved the issue. Is that a reasonable workaround for you? We can also open a GitHub issue if you'd like to keep digging!
h
Hey, thanks for getting back. I’ve used a similar workaround in a few places this occurs e.g.
Copy code
def _create_company_footprint_table():
    create_table(engine, ingest_schema, arin_company_footprints)


@task
def create_company_footprint_table():
    # Call separate function to avoid
    # TypeError: cannot pickle '_thread._local' object
    _create_company_footprint_table()
I can use the workaround, it’s not a huge issue. I was curious if there was a clear explanation I wan’t aware of.
j
@Howard Cornwell not all Python objects can be serialized to bytes and recovered. Prefect uses a generous serialization protocol (
cloudpickle
) but it can’t serialize the SQLAlchemy engine due to its reliance on threadlocals:
Copy code
cloudpickle.dumps(sqlalchemy.create_engine('<postgres://localhost:5432>'))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-a20a9641c605> in <module>
----> 1 cloudpickle.dumps(sqlalchemy.create_engine('<postgres://localhost:5432>'))

/usr/local/Caskroom/miniconda/base/envs/jlowin/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol)
     60     with io.BytesIO() as file:
     61         cp = CloudPickler(file, protocol=protocol)
---> 62         cp.dump(obj)
     63         return file.getvalue()
     64

/usr/local/Caskroom/miniconda/base/envs/jlowin/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    536     def dump(self, obj):
    537         try:
--> 538             return Pickler.dump(self, obj)
    539         except RuntimeError as e:
    540             if "recursion" in e.args[0]:

TypeError: cannot pickle '_thread._local' object
In your original code, it was instantiated and serialized as a global variable for the task function; in your updated code, it is generated inside the task function, and so it is never serialized
h
That makes sense! Thanks for the explanation, I really appreciate it
👍 1