Mikhail Akimov
09/03/2019, 12:18 PMChris White
Mikhail Akimov
09/03/2019, 3:41 PMChris White
J Grover
09/03/2019, 11:25 PMChris White
TypeError
is raised within a task:
def state_handler(tt, old_state, new_state):
if new_state.is_failed() and isinstance(new_state.result, TypeError):
# do something custom
return new_state
if you really needed to talk back to the Flow’s process for some reason, I would recommend directly utilizing dask’s run_on_scheduler
method: https://distributed.dask.org/en/latest/api.html#distributed.Client.run_on_schedulerJ Grover
09/04/2019, 12:11 AMChris White
Mikhail Akimov
09/04/2019, 9:07 AM<http://requests.post|requests.post>()
, I'd like to handle it all in one place. The buffering only make sense if the number of buffers is relatively low. Right now, it's 1 queue for flow + 1 attached to each worker. This works, but I'm unable to persist buffered state changes across worker restarts.Chris White
Mikhail Akimov
09/04/2019, 3:05 PM