I have had this issue with 2 flows now, getting a ...
# prefect-community
m
I have had this issue with 2 flows now, getting a flow state of
Failed
on the UI, while every task and the flow itself is working as expected.. what can be causing this?
Copy code
[10 February 2021 12:06pm]: Unexpected error: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)...
j
Hi @Mitchell Bregman this error looks to me like you have a task returning some piece of data as the result that it is unable to pickle. Any chance you have more information surrounding this error being raised?
m
@josh - do you want the full trace?
here is the flow
Copy code
with flow:
    alert_on = Parameter(
        "alert_on", default=["aws-prod-phonixxdw-5-7", "sf_dms_db", "aws_cape_rpt_new"]
    )
    slack_channel = Parameter("slack_channel", default="#iceburgers_comms")
    cred_metadata = tasks.looker_login.map(endpoints)
    connection_info = tasks.get_connections.map(cred_metadata)
    ping_list = tasks.reduce_connections(connection_info)
    pings = tasks.ping_connection.map(ping_list)
    alert = tasks.alert_failure.map(pings, unmapped(alert_on), unmapped(slack_channel))
    store = tasks.snowflake.map(data=alert)
in case its useful
j
Yeah a full trace could help here
m
copying from prefect UI
Copy code
[10 February 2021 1:06pm]: Unexpected error: TypeError('Could not serialize object of type Success.\nTraceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)\nTypeError: can\'t pickle _thread.lock objects\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 297, in serialize\n header, frames = dumps(x, context=context) if wants_context else dumps(x)\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 63, in pickle_dumps\n protocol=context.get("pickle-protocol", None) if context else None,\n File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n result = cloudpickle.dumps(x, **dump_kwargs)\n File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps\n cp.dump(obj)\n File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n return Pickler.dump(self, obj)\n File "/usr/local/lib/python3.7/pickle.py", line 437, in dump\n self.save(obj)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 549, in save\n self.save_reduce(obj=obj, *rv)\n File "/usr/local/lib/python3.7/pickle.py", line 662, in save_reduce\n save(state)\n File "/usr/local/lib/python3.7/pickle.py", line 504, in save\n f(self, obj) # Call unbound method with explicit self\n File "/usr/local/lib/python3.7/pickle.py", line 859, in save_dict\n self._batch_setitems(obj.items())\n File "/usr/local/lib/python3.7/pickle.py", line 885, in _batch_setitems\n save(v)\n File "/usr/local/lib/python3.7/pickle.py", line 524, in save\n rv = reduce(self.proto)\nTypeError: can\'t pickle _thread.lock objects\n')
the last task is the SnowflakeQuery built-in
j
Ah so it looks like one of your tasks is returning a
_thread.lock
object which I have seen as an issue in the past. You said you are seeing it two flows, are there any tasks that they share? I wonder if there’s a chance that the SnowflakeQuery task could be the bug and if so we should open an issue report for it
m
yes, we are both using the
SnowflakeQuery
one of the flows is running an INSERT statement and the other is CALL stored proc
i can build a custom SnowflakeQuery task but rather just use the built-in for convenience
j
I wonder if you just copy the SnowflakeQuery task source and adjust the return type if it’ll fix it 🤔
m
and it is definitely the snowflake query because i just added that task and this is the first time im seeing this error… before, i was just alerting and not storing the info
whats the current return type
j
Ah I found an issue where this is raised! https://github.com/PrefectHQ/prefect/issues/3744
So this is known
The current return type is a
cursor.execute
result from the snowflake connector library
m
gotcha
so interestingly, this also seems to tie into the DaskExecutor
when running wth a LocalExecutor, no issues are found… no issues with LocalDaskExecutor either
j
Hm yeah I believe that the Dask executor requires that the results are pickleable in order to pass them around
m
gotcha
ok - keep me posted, ill follow that issue thread