I’m getting an error of “Cannot serialize socket object”. The error says “Can’t pickle function <myFunction>: it’s not the same object as Helper_Functions.myFunction”. Any suggestions on how to fix this? I’m mapping over a task with 3 unmapped parameters which as the results of other tasks.
Hey @Nathan Molby what are the results of the task you’re mapping over? Looks like it’s returning something that contains a socket which cannot be pickled
It results in a list which works to map over. It appears that the error is being caused because one of the unmapped parameters cannot be pickled even though I have successfully pickled it in a previous task.
The error is most certainly occurring when I attempt to use one of my parameters as unmapped. I have included this in the mapped parameter as a constant in a tuple(ie (1, constant), (2, constant), (3, constant)) and it is working fine. I don’t understand what the error is coming from.
Would you be willing to post a reproduceable example?
Yeah I will post it later today and tag you on it. Thanks for the help.
👍 1
we are getting the same error we are running dask local cluster on python 3.8
[2020-03-16 04:44:12,974] [ERROR] [prefect.FlowRunner] [inner():66] Unexpected error: TypeError("cannot pickle '_thread.lock' object")
Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3284, in dumps_function
    result = cache_dumps[func]
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/utils.py", line 1518, in __getitem__
    value = super().__getitem__(key)
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 1003, in __getitem__
    raise KeyError(key)
KeyError: <bound method FlowRunner.run_task of <FlowRunner: extraction_data_quality_tests>>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
_pickle.PicklingError: Can't pickle <function fetch_table_info_for_tests at 0x120cc3700>: it's not the same object as __main__.fetch_table_info_for_tests

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 440, in get_flow_run_state
    task_states[task] = executor.submit(
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 137, in submit
    future = self.client.submit(fn, *args, **kwargs)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 1513, in submit
    futures = self._graph_to_futures(
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 2511, in _graph_to_futures
    "tasks": valmap(dumps_task, dsk3),
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3317, in dumps_task
    d = {"function": dumps_function(task[1]), "args": warn_dumps(task[2])}
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3286, in dumps_function
    result = pickle.dumps(func)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 546, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object
[2020-03-16 04:44:12,984] [DEBUG] [prefect.FlowRunner] [call_runner_target_handlers():113] Flow 'extraction_data_quality_tests': Handling state change from Running to Failed
[2020-03-16 04:44:12,984] [ERROR] [prefect.Flow: extraction_data_quality_tests] [run():1033] Unexpected error occured in FlowRunner: TypeError("cannot pickle '_thread.lock' object")
prefect 0.9.7
@Mark McDonald
@Braun Reyes Is yours also related to an unmapped parameter? And do you have a reproduceable example?
we have are massive data quality test flow..lol could certainly see if we could dump it down some to try and force a recreation....yes there are unmapped parameters here is the flow object:
with Flow(pathlib.Path(__file__).stem, SCHEDULE, HANDLER) as flow:
    profiles = generate_profiles()
    environment = Parameter('environment', default=None) # options: acquisition_public, reports_public
    timestamp_column = Parameter('timestamp_column', default=None)
    test_type = Parameter('test_type', default=None) # options: out_of_sync_missing, out_of_sync_jsonb
    env_config = setup(environment, test_type)
    snowflake_setup = create_snowflake_objects(env_config)
    tables = fetch_table_info_for_tests(env_config, timestamp_column, test_type)
    test_suite = create_test.map(tables, unmapped(env_config), unmapped(test_type))
    dbt_operations = dbt_task(command='dbt run-operation create_udfs')
    test_command = test_command_task(tag=test_type)
    dbt_test = dbt_task(command=test_command)
    successful_run = update_state()
    failed_run = analyze_failures()


i think I found the issue
you cannot have objects instantiated outside of the task object(dask delayed object) then referenced inside of the task object
Was just about to comment back! Yeah that would do it. There isn’t a place in bare prefect where a thread lock would be uses when running your flow so it must have been something you were referencing
in our case it was becuase we were referencing the tunnel object here inside of our tasks.
if __name__ == '__main__':
    _local = True  # Local execution flag
    load_dotenv(dotenv_path=os.path.join(os.getcwd(), 'env/dev'))
    if is_serializable(flow):
        print('This flow can be serialized by Cloudpickle')
        print('There is an issue with this flow, it will not be pickled correctly')
    #set up ssh tunnel for connecting to reports db through bastion host
    tunnel = SSHTunnelForwarder(
        remote_bind_address=(os.getenv('CCDE_CONFIG__REPORTS_PUBLIC_HOST'), 5432))

    tunnel.start()  # Start the tunnel
    from prefect.engine.executors import DaskExecutor
    flow.schedule = None  # Ignore the Flow schedule for manual run
    executor = DaskExecutor(n_workers=1, threads_per_worker=2)
        **{"environment": "reports_public", "timestamp_column": "created_at", "test_type": "out_of_sync_missing"}
    tunnel.stop()  # Stop the tunnel
👍 1
@Nathan Molby look to see if you have any instantiated objects outside of your tasked being referenced in your tasks...like DB connection or something like that
upvote 1