Hi - I have a `flow` that runs successfully with a...
# ask-community
i
Hi - I have a
flow
that runs successfully with a
LocalExecutor
. However when the
flow
is run using
flow.executor = DaskExecutor()
the flow returns a
Failed to deserialize
error on the final
task
.
Copy code
[2021-03-03 08:27:24-0600] INFO - prefect.TaskRunner | Task 'new-terms': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/home/ilivni/miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
    inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
The last task is a
FilterTask
filter_res = FilterTask()
Copy code
res = filter_res(
            keywords,
            task_args={
                "name": "new-terms"
                }
            )
Any suggestions?
j
That looks like one of your tasks is raising an exception that fails to pickle. When running with dask your task results (or exceptions) can be serialized between processes, and so must be pickleable.
i
@Jim Crist-Harif I added a downstream task to the task at issue. Now the downstream task is raising the same issue. The data being passed is a dictionary with a bunch of strings... Nothing fancy šŸ¤” .
j
The traceback you're getting indicates there's an exception somewhere that takes a
request
argument to its constructor without adding appropriate pickle support.
Perhaps it's an exception from a third-party library? Are you using prefect results anywhere? If you're using `S3Result`/`GCSResult`, an error there might lead to this behavior (not sure). The aws and google libraries are known to not always pickle things appropriately.
i
Hi @Jim Crist-Harif • I am using
LocalResult
• Not sure if there is a better way to confirm that the task result is pickable but here is what I tried. (1) Run the flow with a local executor (2) get result task (3) Pickle (4) unpickle
Copy code
state = make_nodes_fl.run(
        parameters=dict(
            node_input_lst=[node_input_dict]
            )
        )
    
    # Get results
    param_tsk = make_nodes_fl.get_tasks("new-terms")
    new_params = state.result[param_tsk[0]]._result.value

    my_pickled_object = pickle.dumps(new_params)
    print(f"This is my pickled object:\n{my_pickled_object}\n")

    my_unpickled_object = pickle.loads(my_pickled_object) 
    print(f"This is a_dict of the unpickled object:\n{my_unpickled_object}\n")
Which ran successfully:
Copy code
This is a_dict of the unpickled object:
[{'term': 'binary', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'binary-bit-bit-computer_science', 'edge_id': 'binary-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data.  a binary digit, generally represented as a one or zero.  the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'ADJ', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3039, 'resource_ids': {'term_id': 'binary-bit-bit-computer_science', 'term': 'binary', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}, {'term': 'digit', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'digit-bit-bit-computer_science', 'edge_id': 'digit-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data.  a binary digit, generally represented as a one or zero.  the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'NOUN', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3401, 'resource_ids': {'term_id': 'digit-bit-bit-computer_science', 'term': 'digit', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}]
Hi @Jim Crist-Harif - Update: I am trying to come up with a toy example. But as I have been trying to isolate the problem, it looks like it begins with a
with case
statement. When all the tasks downstream of the
with case
are commented out -- The
flow
runs fine with the
DaskExexutor
. However any downstream
task
that is not commented out returns a
Falied to Deserialize
error
Copy code
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
    inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
Lastly below is a partial visualization of the flow that is executed locally and successfully. I.e. where the cause of the error might be. My next step is perhaps to convert this to an
ifelse
statement and see if that works with the
DaskExecutor
or is that not helpful?
j
I think the most helpful thing for us would be to have a reproducible example. There's nothing in a
case
statement specifically that would cause this issue.
šŸ‘ 1