itay livni
03/03/2021, 4:22 PMflow
that runs successfully with a LocalExecutor
. However when the flow
is run using flow.executor = DaskExecutor()
the flow returns a Failed to deserialize
error on the final task
.
[2021-03-03 08:27:24-0600] INFO - prefect.TaskRunner | Task 'new-terms': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
return loads(header, frames)
File "/home/ilivni/miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
The last task is a FilterTask
filter_res = FilterTask()
res = filter_res(
keywords,
task_args={
"name": "new-terms"
}
)
Any suggestions?Jim Crist-Harif
03/03/2021, 4:40 PMitay livni
03/03/2021, 5:03 PMJim Crist-Harif
03/03/2021, 5:05 PMrequest
argument to its constructor without adding appropriate pickle support.Jim Crist-Harif
03/03/2021, 5:06 PMitay livni
03/03/2021, 6:07 PMLocalResult
⢠Not sure if there is a better way to confirm that the task result is pickable but here is what I tried. (1) Run the flow with a local executor (2) get result task (3) Pickle (4) unpickle
state = make_nodes_fl.run(
parameters=dict(
node_input_lst=[node_input_dict]
)
)
# Get results
param_tsk = make_nodes_fl.get_tasks("new-terms")
new_params = state.result[param_tsk[0]]._result.value
my_pickled_object = pickle.dumps(new_params)
print(f"This is my pickled object:\n{my_pickled_object}\n")
my_unpickled_object = pickle.loads(my_pickled_object)
print(f"This is a_dict of the unpickled object:\n{my_unpickled_object}\n")
Which ran successfully:
This is a_dict of the unpickled object:
[{'term': 'binary', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'binary-bit-bit-computer_science', 'edge_id': 'binary-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data. a binary digit, generally represented as a one or zero. the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'ADJ', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3039, 'resource_ids': {'term_id': 'binary-bit-bit-computer_science', 'term': 'binary', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}, {'term': 'digit', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'digit-bit-bit-computer_science', 'edge_id': 'digit-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data. a binary digit, generally represented as a one or zero. the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'NOUN', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3401, 'resource_ids': {'term_id': 'digit-bit-bit-computer_science', 'term': 'digit', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}]
itay livni
03/05/2021, 5:08 PMwith case
statement.
When all the tasks downstream of the with case
are commented out -- The flow
runs fine with the DaskExexutor
. However any downstream task
that is not commented out returns a Falied to Deserialize
error
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
return loads(header, frames)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
Lastly below is a partial visualization of the flow that is executed locally and successfully. I.e. where the cause of the error might be. My next step is perhaps to convert this to an ifelse
statement and see if that works with the DaskExecutor
or is that not helpful?Jim Crist-Harif
03/05/2021, 7:29 PMcase
statement specifically that would cause this issue.