Mikhail Akimov
09/03/2019, 12:18 PMChris White
09/03/2019, 3:12 PMMikhail Akimov
09/03/2019, 3:41 PMChris White
09/03/2019, 5:26 PMJ Grover
09/03/2019, 11:25 PMChris White
09/03/2019, 11:55 PMTypeError
is raised within a task:
def state_handler(tt, old_state, new_state):
if new_state.is_failed() and isinstance(new_state.result, TypeError):
# do something custom
return new_state
if you really needed to talk back to the Flow’s process for some reason, I would recommend directly utilizing dask’s run_on_scheduler
method: https://distributed.dask.org/en/latest/api.html#distributed.Client.run_on_schedulerJ Grover
09/04/2019, 12:11 AMChris White
09/04/2019, 12:13 AMMikhail Akimov
09/04/2019, 9:07 AM<http://requests.post|requests.post>()
, I'd like to handle it all in one place. The buffering only make sense if the number of buffers is relatively low. Right now, it's 1 queue for flow + 1 attached to each worker. This works, but I'm unable to persist buffered state changes across worker restarts.Chris White
09/04/2019, 2:46 PMMikhail Akimov
09/04/2019, 3:05 PM