https://prefect.io logo
Title
d

Dekel R

04/30/2023, 1:49 PM
Hey all, I have a Prefect 2 flow using DaskTaskRunner (Reading data from multiple unrelated sources, preprocessing and then training a model). I also use map as part of the flow (for parsing some objects in parallel). Everything works fine until I get this error (related to Dask) -
12:36:08.900 | INFO    | Flow run 'logical-peacock' - Created task run 'calculate_additional_features-0' for task 'calculate_additional_features'
2023-04-30 12:36:08,915 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
What is the problem here? what am I missing? Sharing my flow code as a comment on this message. Setup - running locally with pycharm’s docker compose interpeter, Prefect 2.10.4 prefect-dask==0.2.3 Thanks.
@Ofek Katriel FYI
from prefect import flow, get_run_logger, unmapped
from src.transformatio_tasks import get_asins_to_transform, load_model_embedding_df,\
    check_existing_transformed_asins, get_pdp_data, load_nth_recent_file_from_storage,\
    get_titles_without_brand_and_unit_words, sentence_preprocess, calculate_additional_features
from alerts import send_slack_alert
from filter_layer_logic import save_to_partiotions
from src.adp_tasks import missing_asins_request
from math import ceil
import pandas as pd
from datetime import datetime
from src.configurations import PRODUCTION_BUCKET, TF_IDF_FOLDER, TRANSFORM_BATCH_SIZE
from prefect_dask.task_runners import DaskTaskRunner


@flow(
    name='similarity_filter_layer',
    description='Preprocessing for similarity filter layer',
    retries=2,
    task_runner=DaskTaskRunner()
)
def flow(
        is_onboarding: bool = False,
        onboarding_asins_path: str = None
):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting filter layer preprocessing")
    # missing_asins_request.submit()

    required_model_asins = load_model_embedding_df.submit()
    existing_transformed_asins = check_existing_transformed_asins.submit(asins_to_check=required_model_asins)
    asins_to_transform = get_asins_to_transform.submit(required_model_asins=required_model_asins,
                           existing_transformed_asins=existing_transformed_asins)
    <http://logger.info|logger.info>('Starting transform process')
    pdp_df = get_pdp_data.submit(asins_to_transform = asins_to_transform).result()
    tf_idf_dict = load_nth_recent_file_from_storage.submit(bucket=PRODUCTION_BUCKET, folder=TF_IDF_FOLDER, file_rank=1)
    total_batches = ceil(len(pdp_df) / TRANSFORM_BATCH_SIZE)
    # total_batches = 1
    transform_df_list = []
    <http://logger.info|logger.info>(f"Starting transform per batch loop")

    for i in range(total_batches):
        pdp_slice = pdp_df.iloc[i * TRANSFORM_BATCH_SIZE:(i + 1) * TRANSFORM_BATCH_SIZE]
        df, title_without_brand_list, unit_words = get_titles_without_brand_and_unit_words.submit(input_df=pdp_slice).result()
        processed_title_list = sentence_preprocess.map(unit_words=unmapped(unit_words), is_stem=unmapped(True),
                                                       sentence=title_without_brand_list)
        processed_title_df = pd.DataFrame(df['asin'].to_list(), columns=['asin'])
        processed_title_df['title_processed_stem'] = pd.Series(processed_title_list)
        transform_df = calculate_additional_features.submit(df=pdp_slice, tf_idf_dict=tf_idf_dict, tf_idf_partition_var='cat_0',
                                      processed_title_df=processed_title_df).result()

        <http://logger.info|logger.info>(f'Transform batch {i + 1} / {total_batches} done')
        transform_df_list.append(transform_df)
        del transform_df

    <http://logger.info|logger.info>(f"Finished transforming")
    full_transformed_df = pd.concat(transform_df_list)
    full_transformed_df['transform_ts'] = datetime.now()

    saved_to_partitions = save_to_partiotions.submit(wait_for=full_transformed_df)
    saved_to_partitions_state = saved_to_partitions.wait()

    if saved_to_partitions_state.is_failed():
        <http://logger.info|logger.info>("The filter layer flow failed! Look at the logs")
        send_slack_alert("The filter layer flow failed! Look at the logs")
    else:
        <http://logger.info|logger.info>("The filter layer flow finished running!")
        send_slack_alert("The filter layer flow finished running!")

if __name__ == '__main__':
    flow()
I get the error in this line -
transform_df = calculate_additional_features.submit(df=pdp_slice, tf_idf_dict=tf_idf_dict, tf_idf_partition_var='cat_0',
                                      processed_title_df=processed_title_df).result()
I solvedd this byy getting some of the nativePythonn code inside a task instead of using it in the flow context. Specifically the following lines -
processed_title_df = pd.DataFrame(df['asin'].to_list(), columns=['asin'])
        processed_title_df['title_processed_stem'] = pd.Series(processed_title_list)
After running the flow locally I deployed it and got this error after the “map” function I’m using - see the error -
Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
    func, target_state = self._transitions[(self.state, input_)]
KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.SEND_HEADERS: 0>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 116, in handle_async_request
    await self._send_request_headers(request=request, stream_id=stream_id)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 213, in _send_request_headers
    self._h2_state.send_headers(stream_id, headers, end_stream=end_stream)
  File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 766, in send_headers
    self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
  File "/opt/conda/lib/python3.10/site-packages/h2/connection.py", line 228, in process_input
    raise ProtocolError(
h2.exceptions.ProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http2.py", line 152, in handle_async_request
    raise LocalProtocolError(exc)  # pragma: nocover
httpcore.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED

The above exception was the direct cause of the following exception:

httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
02:59:38 PM
prefect.flow_runs
Any suggestions? Thanks
l

Luis Cebrián

05/02/2023, 1:38 PM
I got the same error today. This issue helped me and specifically, this comment.
👀 1
Regarding your Dask error, I'm afraid I have no dask experience to help you.
d

Dekel R

05/03/2023, 9:18 AM
Thanks I did it and now I get this error -
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
    stream = await self._connect(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
    stream = await stream.start_tls(**kwargs)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 66, in start_tls
    with map_exceptions(exc_map):
  File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ConnectError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "/app/flow.py", line 59, in flow
    tf_idf_partition_var='cat_0').result()
  File "/opt/conda/lib/python3.10/site-packages/prefect/futures.py", line 232, in result
    ).result()
  File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/opt/conda/lib/python3.10/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/opt/conda/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/conda/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 271, in wait
    return await future.result(timeout=timeout)
  File "/opt/conda/lib/python3.10/site-packages/distributed/client.py", line 329, in _result
    raise exc.with_traceback(tb)
  File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 1370, in begin_task_run
    state = await orchestrate_task_run(
  File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 1447, in orchestrate_task_run
    flow_run = await client.read_flow_run(task_run.flow_run_id)
  File "/opt/conda/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1609, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1754, in get
    return await self.request(
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/opt/conda/lib/python3.10/site-packages/prefect/client/base.py", line 246, in send
    response = await self._send_with_retry(
  File "/opt/conda/lib/python3.10/site-packages/prefect/client/base.py", line 192, in _send_with_retry
    response = await request()
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1617, in send
    response = await self._send_handling_auth(
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1719, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
    with map_httpcore_exceptions():
  File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectError
12:12:12 PM
prefect.flow_runs
Crash detected! Request to <https://api.prefect.cloud/api/accounts/0aceff1b-4fdb-4469-adde-197148a98672/workspaces/b7474c31-4033-449b-96b6-f3544e6f9408/flow_runs/8907a295-5335-4262-99fe-a6921dbba067> failed: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
    result = func(*args)
  File "/opt/conda/lib/python3.10/ssl.py", line 975, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:997)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 78, in start_tls
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 69, in start_tls
    ssl_stream = await anyio.streams.tls.TLSStream.wrap(
  File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 122, in wrap
    await wrapper._call_sslobject_method(ssl_object.do_handshake)
  File "/opt/conda/lib/python3.10/site-packages/anyio/streams/tls.py", line 151, in _call_sslobject_method
    raise BrokenResourceError from exc
anyio.BrokenResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
    raise exc
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
    stream = await self._connect(request)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
    stream = await stream.start_tls(**kwargs)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 66, in start_tls
    with map_exceptions(exc_map):
  File "/opt/conda/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/opt/conda/lib/python3.10/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ConnectError

The above exception was the direct cause of the following exception:

httpx.ConnectError
.