Nathan Molby
03/16/2020, 2:39 PMjosh
03/16/2020, 2:44 PMNathan Molby
03/16/2020, 2:54 PMNathan Molby
03/16/2020, 2:59 PMjosh
03/16/2020, 3:00 PMNathan Molby
03/16/2020, 3:01 PMBraun Reyes
03/16/2020, 6:23 PM[2020-03-16 04:44:12,974] [ERROR] [prefect.FlowRunner] [inner():66] Unexpected error: TypeError("cannot pickle '_thread.lock' object")
Traceback (most recent call last):
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3284, in dumps_function
result = cache_dumps[func]
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/utils.py", line 1518, in __getitem__
value = super().__getitem__(key)
File "/usr/local/Cellar/python@3.8/3.8.1/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 1003, in __getitem__
raise KeyError(key)
KeyError: <bound method FlowRunner.run_task of <FlowRunner: extraction_data_quality_tests>>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 38, in dumps
result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
_pickle.PicklingError: Can't pickle <function fetch_table_info_for_tests at 0x120cc3700>: it's not the same object as __main__.fetch_table_info_for_tests
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 440, in get_flow_run_state
task_states[task] = executor.submit(
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 137, in submit
future = self.client.submit(fn, *args, **kwargs)
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 1513, in submit
futures = self._graph_to_futures(
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/client.py", line 2511, in _graph_to_futures
"tasks": valmap(dumps_task, dsk3),
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(iterkeys(d), map(func, itervalues(d))))
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3317, in dumps_task
d = {"function": dumps_function(task[1]), "args": warn_dumps(task[2])}
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/worker.py", line 3286, in dumps_function
result = pickle.dumps(func)
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 51, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
cp.dump(obj)
File "/Users/braunreyes/cc_projects/ccde-data-quality-tests/venv/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 546, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object
[2020-03-16 04:44:12,984] [DEBUG] [prefect.FlowRunner] [call_runner_target_handlers():113] Flow 'extraction_data_quality_tests': Handling state change from Running to Failed
[2020-03-16 04:44:12,984] [ERROR] [prefect.Flow: extraction_data_quality_tests] [run():1033] Unexpected error occured in FlowRunner: TypeError("cannot pickle '_thread.lock' object")
Braun Reyes
03/16/2020, 6:24 PMBraun Reyes
03/16/2020, 6:24 PMjosh
03/16/2020, 6:26 PMBraun Reyes
03/16/2020, 6:29 PMwith Flow(pathlib.Path(__file__).stem, SCHEDULE, HANDLER) as flow:
profiles = generate_profiles()
environment = Parameter('environment', default=None) # options: acquisition_public, reports_public
timestamp_column = Parameter('timestamp_column', default=None)
test_type = Parameter('test_type', default=None) # options: out_of_sync_missing, out_of_sync_jsonb
env_config = setup(environment, test_type)
snowflake_setup = create_snowflake_objects(env_config)
tables = fetch_table_info_for_tests(env_config, timestamp_column, test_type)
test_suite = create_test.map(tables, unmapped(env_config), unmapped(test_type))
dbt_operations = dbt_task(command='dbt run-operation create_udfs')
test_command = test_command_task(tag=test_type)
dbt_test = dbt_task(command=test_command)
successful_run = update_state()
failed_run = analyze_failures()
dbt_operations.set_upstream(snowflake_setup)
dbt_operations.set_upstream(test_suite)
dbt_operations.set_upstream(profiles)
dbt_test.set_upstream(dbt_operations)
failed_run.set_upstream(dbt_test)
successful_run.set_upstream(dbt_test)
flow.set_reference_tasks([dbt_test])
Braun Reyes
03/16/2020, 6:59 PMBraun Reyes
03/16/2020, 7:01 PMjosh
03/16/2020, 7:02 PMBraun Reyes
03/16/2020, 7:02 PMif __name__ == '__main__':
_local = True # Local execution flag
load_dotenv(dotenv_path=os.path.join(os.getcwd(), 'env/dev'))
if is_serializable(flow):
print('This flow can be serialized by Cloudpickle')
else:
print('There is an issue with this flow, it will not be pickled correctly')
#set up ssh tunnel for connecting to reports db through bastion host
tunnel = SSHTunnelForwarder(
(os.getenv('CCDE_BASTION_HOST')),
ssh_username=os.getenv('CCDE_BASTION_USER'),
ssh_pkey=os.getenv('CCDE_BASTION_KEY_PATH'),
ssh_private_key_password=os.getenv('CCDE_BASTION_KEY_PW'),
remote_bind_address=(os.getenv('CCDE_CONFIG__REPORTS_PUBLIC_HOST'), 5432))
tunnel.start() # Start the tunnel
from prefect.engine.executors import DaskExecutor
flow.schedule = None # Ignore the Flow schedule for manual run
executor = DaskExecutor(n_workers=1, threads_per_worker=2)
flow.run(
executor=executor,
**{"environment": "reports_public", "timestamp_column": "created_at", "test_type": "out_of_sync_missing"}
)
tunnel.stop() # Stop the tunnel
Braun Reyes
03/16/2020, 7:03 PMBraun Reyes
03/16/2020, 7:06 PM