itay livni
03/03/2021, 4:22 PMflow that runs successfully with a LocalExecutor . However when the flow is run using flow.executor = DaskExecutor() the flow returns a Failed to deserialize error on the final task.
[2021-03-03 08:27:24-0600] INFO - prefect.TaskRunner | Task 'new-terms': Finished task run for task with final state: 'Success'
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
return loads(header, frames)
File "/home/ilivni/miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
The last task is a FilterTask filter_res = FilterTask()
res = filter_res(
keywords,
task_args={
"name": "new-terms"
}
)
Any suggestions?Jim Crist-Harif
03/03/2021, 4:40 PMitay livni
03/03/2021, 5:03 PMJim Crist-Harif
03/03/2021, 5:05 PMrequest argument to its constructor without adding appropriate pickle support.Jim Crist-Harif
03/03/2021, 5:06 PMitay livni
03/03/2021, 6:07 PMLocalResult
⢠Not sure if there is a better way to confirm that the task result is pickable but here is what I tried. (1) Run the flow with a local executor (2) get result task (3) Pickle (4) unpickle
state = make_nodes_fl.run(
parameters=dict(
node_input_lst=[node_input_dict]
)
)
# Get results
param_tsk = make_nodes_fl.get_tasks("new-terms")
new_params = state.result[param_tsk[0]]._result.value
my_pickled_object = pickle.dumps(new_params)
print(f"This is my pickled object:\n{my_pickled_object}\n")
my_unpickled_object = pickle.loads(my_pickled_object)
print(f"This is a_dict of the unpickled object:\n{my_unpickled_object}\n")
Which ran successfully:
This is a_dict of the unpickled object:
[{'term': 'binary', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'binary-bit-bit-computer_science', 'edge_id': 'binary-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data. a binary digit, generally represented as a one or zero. the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'ADJ', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3039, 'resource_ids': {'term_id': 'binary-bit-bit-computer_science', 'term': 'binary', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}, {'term': 'digit', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic': 'computer science', 'term_id': 'digit-bit-bit-computer_science', 'edge_id': 'digit-bit', 'context_txt': 'a binary digit, having either the value zero or one, used to store or represent data. a binary digit, generally represented as a one or zero. the smallest unit of storage in a digital computer, consisting of a binary digit.', 'pos': 'NOUN', 'topic_id': 'bit-computer_science', 'keyword_score': 0.3401, 'resource_ids': {'term_id': 'digit-bit-bit-computer_science', 'term': 'digit', 'topic': 'computer science', 'parent_term': 'bit', 'user_input_term': 'bit', 'topic_id': 'bit-computer_science'}}]itay livni
03/05/2021, 5:08 PMwith case statement.
When all the tasks downstream of the with case are commented out -- The flow runs fine with the DaskExexutor . However any downstream task that is not commented out returns a Falied to Deserialize error
INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
value = _deserialize(head, fs, deserializers=deserializers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
return loads(header, frames)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
return pickle.loads(x, buffers=buffers)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
inst = func(*args)
TypeError: __init__() missing 1 required positional argument: 'request'
Lastly below is a partial visualization of the flow that is executed locally and successfully. I.e. where the cause of the error might be. My next step is perhaps to convert this to an ifelse
statement and see if that works with the DaskExecutor or is that not helpful?Jim Crist-Harif
03/05/2021, 7:29 PMcase statement specifically that would cause this issue.