https://prefect.io logo
m

Mikhail Akimov

09/03/2019, 12:18 PM
Where do the task_state_handlers get called? In the main flow (scheduler) process or on the workers (dask, subprocess etc.) I'm passing an object to task_state_handlers through a closure, and with sequential/threads LocalDaskExecutor they work OK, but with processes it's clear that the passed object isn't the same instance for all task runs I was under the impression that the scheduler catches state changes and calls all the callbacks, while the executor is only responsible for, well, the execution of work.
c

Chris White

09/03/2019, 3:12 PM
They get called on the dask workers. The executor is responsible for submitting the individual Runner pipelines for work; Runner pipelines include such things are managing state transitions for the individual task that was submitted.
m

Mikhail Akimov

09/03/2019, 3:41 PM
Thanks for the clarification! So task states are not getting reported to the original process? I'm trying to buffer messages going from prefect to my storage backend and this buffer only makes sense if it's relatively long-lived (minutes and hours, not seconds).
I guess I can just attach it to the worker with register_worker_callbacks, but then I'd have to get to the same object from state handler code.
c

Chris White

09/03/2019, 5:26 PM
final task states are always reported back to the original process, but intermediate ones are not necessarily
j

J Grover

09/03/2019, 11:25 PM
@Chris White Is there a recommended way of passing these intermediate states back to the flow process. Say if we wanted to take an action on the flow based on if an exception occurred in a task or not.
c

Chris White

09/03/2019, 11:55 PM
Hi @J Grover - for taking action based on an exception within a task, you could use a `state_handler`; for example, to do something custom if a
TypeError
is raised within a task:
Copy code
def state_handler(tt, old_state, new_state):
    if new_state.is_failed() and isinstance(new_state.result, TypeError):
        # do something custom
    return new_state
if you really needed to talk back to the Flow’s process for some reason, I would recommend directly utilizing dask’s
run_on_scheduler
method: https://distributed.dask.org/en/latest/api.html#distributed.Client.run_on_scheduler
j

J Grover

09/04/2019, 12:11 AM
I am more after the latter. So there isn't a native Prefect way of propagating such task state changes to the main process while the flow is running?
c

Chris White

09/04/2019, 12:13 AM
Honestly we’ve never even considered it - could you explain your use case for needing this?
m

Mikhail Akimov

09/04/2019, 9:07 AM
Well in my case, flow and it's tasks are reporting their state changes to a server via API. API client wrapper is buffered: I add messages to a deque and then try to send them, so that they are kept there in case the webserver is down. If the flow is stopped, I persist the deque to a file that will be loaded on the next flow.run() call, so theoretically no state will be lost and eventually all messages will reach the server. Because communication with the server is more complex than just calling
<http://requests.post|requests.post>()
, I'd like to handle it all in one place. The buffering only make sense if the number of buffers is relatively low. Right now, it's 1 queue for flow + 1 attached to each worker. This works, but I'm unable to persist buffered state changes across worker restarts.
So I'm doing something similar to this with state handlers. Would make more sense to work with task state handler the same way I work with flow state handler.
c

Chris White

09/04/2019, 2:46 PM
You could put your API payloads into a distributed Queue (https://distributed.dask.org/en/latest/api.html#distributed.Queue), and then read off the queue in a background thread in the flow’s process
👍 1
m

Mikhail Akimov

09/04/2019, 3:05 PM
This is super cool, thank you for directing me there. I thought that an external queue would be a solution, but didn't want to deploy a standalone mq for this.
👍 1