https://prefect.io logo
Title
n

Nathan Molby

03/16/2020, 2:39 PM
I’m getting an error of “Cannot serialize socket object”. The error says “Can’t pickle function <myFunction>: it’s not the same object as Helper_Functions.myFunction”. Any suggestions on how to fix this? I’m mapping over a task with 3 unmapped parameters which as the results of other tasks.
j

josh

03/16/2020, 2:44 PM
Hey @Nathan Molby what are the results of the task you’re mapping over? Looks like it’s returning something that contains a socket which cannot be pickled
n

Nathan Molby

03/16/2020, 2:54 PM
It results in a list which works to map over. It appears that the error is being caused because one of the unmapped parameters cannot be pickled even though I have successfully pickled it in a previous task.
The error is most certainly occurring when I attempt to use one of my parameters as unmapped. I have included this in the mapped parameter as a constant in a tuple(ie (1, constant), (2, constant), (3, constant)) and it is working fine. I don’t understand what the error is coming from.
j

josh

03/16/2020, 3:00 PM
Would you be willing to post a reproduceable example?
n

Nathan Molby

03/16/2020, 3:01 PM
Yeah I will post it later today and tag you on it. Thanks for the help.
👍 1
b

Braun Reyes

03/16/2020, 6:23 PM
we are getting the same error we are running dask local cluster on python 3.8
[2020-03-16 04:44:12,974] [ERROR] [prefect.FlowRunner] [inner():66] Unexpected error: TypeError("cannot pickle '_thread.lock' object")
Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3284, in dumps_function
    result = cache_dumps[func]
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/utils.py", line 1518, in __getitem__
    value = super().__getitem__(key)
  File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 1003, in __getitem__
    raise KeyError(key)
KeyError: <bound method FlowRunner.run_task of <FlowRunner: extraction_data_quality_tests>>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 38, in dumps
    result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
_pickle.PicklingError: Can't pickle <function fetch_table_info_for_tests at 0x120cc3700>: it's not the same object as __main__.fetch_table_info_for_tests

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 440, in get_flow_run_state
    task_states[task] = executor.submit(
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 137, in submit
    future = self.client.submit(fn, *args, **kwargs)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 1513, in submit
    futures = self._graph_to_futures(
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 2511, in _graph_to_futures
    "tasks": valmap(dumps_task, dsk3),
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3317, in dumps_task
    d = {"function": dumps_function(task[1]), "args": warn_dumps(task[2])}
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3286, in dumps_function
    result = pickle.dumps(func)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
    cp.dump(obj)
  File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 546, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object
[2020-03-16 04:44:12,984] [DEBUG] [prefect.FlowRunner] [call_runner_target_handlers():113] Flow 'extraction_data_quality_tests': Handling state change from Running to Failed
[2020-03-16 04:44:12,984] [ERROR] [prefect.Flow: extraction_data_quality_tests] [run():1033] Unexpected error occured in FlowRunner: TypeError("cannot pickle '_thread.lock' object")
prefect 0.9.7
@Mark McDonald
j

josh

03/16/2020, 6:26 PM
@Braun Reyes Is yours also related to an unmapped parameter? And do you have a reproduceable example?
b

Braun Reyes

03/16/2020, 6:29 PM
we have are massive data quality test flow..lol could certainly see if we could dump it down some to try and force a recreation....yes there are unmapped parameters here is the flow object:
with Flow(pathlib.Path(__file__).stem, SCHEDULE, HANDLER) as flow:
    profiles = generate_profiles()
    environment = Parameter('environment', default=None) # options: acquisition_public, reports_public
    timestamp_column = Parameter('timestamp_column', default=None)
    test_type = Parameter('test_type', default=None) # options: out_of_sync_missing, out_of_sync_jsonb
    env_config = setup(environment, test_type)
    snowflake_setup = create_snowflake_objects(env_config)
    tables = fetch_table_info_for_tests(env_config, timestamp_column, test_type)
    test_suite = create_test.map(tables, unmapped(env_config), unmapped(test_type))
    dbt_operations = dbt_task(command='dbt run-operation create_udfs')
    test_command = test_command_task(tag=test_type)
    dbt_test = dbt_task(command=test_command)
    successful_run = update_state()
    failed_run = analyze_failures()

    dbt_operations.set_upstream(snowflake_setup)
    dbt_operations.set_upstream(test_suite)
    dbt_operations.set_upstream(profiles)
    dbt_test.set_upstream(dbt_operations)
    failed_run.set_upstream(dbt_test)
    successful_run.set_upstream(dbt_test)

    flow.set_reference_tasks([dbt_test])
i think I found the issue
you cannot have objects instantiated outside of the task object(dask delayed object) then referenced inside of the task object
j

josh

03/16/2020, 7:02 PM
Was just about to comment back! Yeah that would do it. There isn’t a place in bare prefect where a thread lock would be uses when running your flow so it must have been something you were referencing
b

Braun Reyes

03/16/2020, 7:02 PM
in our case it was becuase we were referencing the tunnel object here inside of our tasks.
if __name__ == '__main__':
    _local = True  # Local execution flag
    load_dotenv(dotenv_path=os.path.join(os.getcwd(), 'env/dev'))
    if is_serializable(flow):
        print('This flow can be serialized by Cloudpickle')
    else:
        print('There is an issue with this flow, it will not be pickled correctly')
    #set up ssh tunnel for connecting to reports db through bastion host
    tunnel = SSHTunnelForwarder(
        (os.getenv('CCDE_BASTION_HOST')),
        ssh_username=os.getenv('CCDE_BASTION_USER'),
        ssh_pkey=os.getenv('CCDE_BASTION_KEY_PATH'),
        ssh_private_key_password=os.getenv('CCDE_BASTION_KEY_PW'),
        remote_bind_address=(os.getenv('CCDE_CONFIG__REPORTS_PUBLIC_HOST'), 5432))

    tunnel.start()  # Start the tunnel
    from prefect.engine.executors import DaskExecutor
    flow.schedule = None  # Ignore the Flow schedule for manual run
    executor = DaskExecutor(n_workers=1, threads_per_worker=2)
    flow.run(
        executor=executor,
        **{"environment": "reports_public", "timestamp_column": "created_at", "test_type": "out_of_sync_missing"}
    )
    tunnel.stop()  # Stop the tunnel
👍 1
@Nathan Molby look to see if you have any instantiated objects outside of your tasked being referenced in your tasks...like DB connection or something like that
:upvote: 1