Dekel R
04/30/2023, 1:49 PM12:36:08.900 | INFO | Flow run 'logical-peacock' - Created task run 'calculate_additional_features-0' for task 'calculate_additional_features'
2023-04-30 12:36:08,915 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
What is the problem here? what am I missing?
Sharing my flow code as a comment on this message.
Setup - running locally with pycharm’s docker compose interpeter, Prefect 2.10.4 prefect-dask==0.2.3
Thanks.from prefect import flow, get_run_logger, unmapped
from src.transformatio_tasks import get_asins_to_transform, load_model_embedding_df,\
check_existing_transformed_asins, get_pdp_data, load_nth_recent_file_from_storage,\
get_titles_without_brand_and_unit_words, sentence_preprocess, calculate_additional_features
from alerts import send_slack_alert
from filter_layer_logic import save_to_partiotions
from src.adp_tasks import missing_asins_request
from math import ceil
import pandas as pd
from datetime import datetime
from src.configurations import PRODUCTION_BUCKET, TF_IDF_FOLDER, TRANSFORM_BATCH_SIZE
from prefect_dask.task_runners import DaskTaskRunner
@flow(
name='similarity_filter_layer',
description='Preprocessing for similarity filter layer',
retries=2,
task_runner=DaskTaskRunner()
)
def flow(
is_onboarding: bool = False,
onboarding_asins_path: str = None
):
logger = get_run_logger()
<http://logger.info|logger.info>("Starting filter layer preprocessing")
# missing_asins_request.submit()
required_model_asins = load_model_embedding_df.submit()
existing_transformed_asins = check_existing_transformed_asins.submit(asins_to_check=required_model_asins)
asins_to_transform = get_asins_to_transform.submit(required_model_asins=required_model_asins,
existing_transformed_asins=existing_transformed_asins)
<http://logger.info|logger.info>('Starting transform process')
pdp_df = get_pdp_data.submit(asins_to_transform = asins_to_transform).result()
tf_idf_dict = load_nth_recent_file_from_storage.submit(bucket=PRODUCTION_BUCKET, folder=TF_IDF_FOLDER, file_rank=1)
total_batches = ceil(len(pdp_df) / TRANSFORM_BATCH_SIZE)
# total_batches = 1
transform_df_list = []
<http://logger.info|logger.info>(f"Starting transform per batch loop")
for i in range(total_batches):
pdp_slice = pdp_df.iloc[i * TRANSFORM_BATCH_SIZE:(i + 1) * TRANSFORM_BATCH_SIZE]
df, title_without_brand_list, unit_words = get_titles_without_brand_and_unit_words.submit(input_df=pdp_slice).result()
processed_title_list = sentence_preprocess.map(unit_words=unmapped(unit_words), is_stem=unmapped(True),
sentence=title_without_brand_list)
processed_title_df = pd.DataFrame(df['asin'].to_list(), columns=['asin'])
processed_title_df['title_processed_stem'] = pd.Series(processed_title_list)
transform_df = calculate_additional_features.submit(df=pdp_slice, tf_idf_dict=tf_idf_dict, tf_idf_partition_var='cat_0',
processed_title_df=processed_title_df).result()
<http://logger.info|logger.info>(f'Transform batch {i + 1} / {total_batches} done')
transform_df_list.append(transform_df)
del transform_df
<http://logger.info|logger.info>(f"Finished transforming")
full_transformed_df = pd.concat(transform_df_list)
full_transformed_df['transform_ts'] = datetime.now()
saved_to_partitions = save_to_partiotions.submit(wait_for=full_transformed_df)
saved_to_partitions_state = saved_to_partitions.wait()
if saved_to_partitions_state.is_failed():
<http://logger.info|logger.info>("The filter layer flow failed! Look at the logs")
send_slack_alert("The filter layer flow failed! Look at the logs")
else:
<http://logger.info|logger.info>("The filter layer flow finished running!")
send_slack_alert("The filter layer flow finished running!")
if __name__ == '__main__':
flow()
I get the error in this line -
transform_df = calculate_additional_features.submit(df=pdp_slice, tf_idf_dict=tf_idf_dict, tf_idf_partition_var='cat_0',
processed_title_df=processed_title_df).result()
processed_title_df = pd.DataFrame(df['asin'].to_list(), columns=['asin'])
processed_title_df['title_processed_stem'] = pd.Series(processed_title_list)
Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 116, in handle_async_request
await self._send_request_headers(request=request, stream_id=stream_id)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 213, in _send_request_headers
self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 766, in send_headers
self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 228, in process_input
raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 152, in handle_async_request
raise LocalProtocolError(exc) # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
The above exception was the direct cause of the following exception:
httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
02:59:38 PM
prefect.flow_runs
Any suggestions?
ThanksDekel R
05/03/2023, 9:18 AMEncountered exception during execution:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
stream = await self._connect(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
stream = await stream.start_tls(**kwargs)
File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 66, in start_tls
with map_exceptions(exc_map):
File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc)
httpcore.ConnectError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
result = self.fn(*self.args, **self.kwargs)
File "/app/flow.py", line 59, in flow
tf_idf_partition_var='cat_0').result()
File "/opt/conda/lib/python3.10/site-packages/prefect/futures.py", line 232, in result
).result()
File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 458, in result
return self.__get_result()
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/opt/conda/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/opt/conda/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/opt/conda/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 271, in wait
return await future.result(timeout=timeout)
File "/opt/conda/lib/python3.10/site-packages/distributed/client.py", line 329, in _result
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 1370, in begin_task_run
state = await orchestrate_task_run(
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 1447, in orchestrate_task_run
flow_run = await client.read_flow_run(task_run.flow_run_id)
File "/opt/conda/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1609, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1754, in get
return await self.request(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/opt/conda/lib/python3.10/site-packages/prefect/client/base.py", line 246, in send
response = await self._send_with_retry(
File "/opt/conda/lib/python3.10/site-packages/prefect/client/base.py", line 192, in _send_with_retry
response = await request()
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1617, in send
response = await self._send_handling_auth(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
response = await self._send_handling_redirects(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
response = await self._send_single_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1719, in _send_single_request
response = await transport.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
with map_httpcore_exceptions():
File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError
12:12:12 PM
prefect.flow_runs
Crash detected! Request to <https://api.prefect.cloud/api/accounts/0aceff1b-4fdb-4469-adde-197148a98672/workspaces/b7474c31-4033-449b-96b6-f3544e6f9408/flow_runs/8907a295-5335-4262-99fe-a6921dbba067> failed: Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
result = func(*args)
File "/opt/conda/lib/python3.10/ssl.py", line 975, in do_handshake
self._sslobj.do_handshake()
ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:997)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
yield
File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 78, in start_tls
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 69, in start_tls
ssl_stream = await anyio.streams.tls.TLSStream.wrap(
File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 122, in wrap
await wrapper._call_sslobject_method(ssl_object.do_handshake)
File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 151, in _call_sslobject_method
raise BrokenResourceError from exc
anyio.BrokenResourceError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
stream = await self._connect(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
stream = await stream.start_tls(**kwargs)
File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 66, in start_tls
with map_exceptions(exc_map):
File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc)
httpcore.ConnectError
The above exception was the direct cause of the following exception:
httpx.ConnectError
.