Matthew Seligson

    Matthew Seligson

    6 months ago
    Do state handlers capture state changes induced by setting state via the UI?
    I have a flow run which completed successfully. I then restarted the final task, let it get into a running state, then marked it as failed. Yet it did not activate our state handler for handling task failures.
    Kevin Kho

    Kevin Kho

    6 months ago
    They do not for the most part because it doesn’t pass the Flow. The UI sets it directly on the Database without passing through the Flow. I think only Cancellation states pass through because it has to communicate with the Flow. I think I tested this and Failed states don’t go through state handlers
    Matthew Seligson

    Matthew Seligson

    6 months ago
    I see. Thanks Kevin!
    On this same vein, does cancelling a flow also set the running task states to “cancelled”? I’d like to have state handlers for my tasks that are triggered when they are cancelled
    Kevin Kho

    Kevin Kho

    5 months ago
    From my experience, I think having the state handler on the Flow level works but the task level does not. See this. Because those are just set directly on the database
    Matthew Seligson

    Matthew Seligson

    5 months ago
    Each task type may need to have its own cancellation implementation that isn’t known at the flow level.
    Does that seem sensible?
    Kevin Kho

    Kevin Kho

    5 months ago
    I think anything defined on the task state handler is not even passed during the cancellation route so this can’t be done at the moment.
    Any change around this is likely going to happen for 2.0, but I don’t know anything yet about how cancellation will be treated. You’re welcome to open a feature request though
    What are the conditions when you decide to cancel flows? Wondering if resource managers can somehow help but I feel like it can’t
    Matthew Seligson

    Matthew Seligson

    5 months ago
    Thanks Kevin. Is there any way to signal to a running task to cancel? Can I raise an ENDRUN for a particular task run from my flow run?
    Kevin Kho

    Kevin Kho

    5 months ago
    You mean this in the context of using the API or UI right? Do you want one task to cancel but the flow to continue or do you mean cancel the flow and then cancel all currently executing tasks as well?
    Matthew Seligson

    Matthew Seligson

    5 months ago
    Sorry if I was being confusing. When we cancel the flow, the flow state handler can detect this. Yesterday we established that the task state handler cannot. So I’m wondering if the flow state handler, upon a cancel, can signal ENDRUN(cancelled) to the running tasks.
    Kevin Kho

    Kevin Kho

    5 months ago
    Ah that is generally very hard in Python (stopping something in another thread) so I believe Dask doesn’t support this in general. We do have logic to prevent more tasks from being submitted, but I don’t think we can have an abrupt cancellation on the already running tasks
    Matthew Seligson

    Matthew Seligson

    5 months ago
    Hi Kevin. We are still working to find a solution here. Our tasks are determining their remote execution environment at runtime, which means that they need to intelligently tear down that execution environment when a flow is cancelled. Is there a way for the running task to communicate details about this execution environment to the flow (perhaps via the context) so that the flow state handler can do cleanup upon cancellation?
    Anna Geller

    Anna Geller

    5 months ago
    @Matthew Seligson your best option would be to have a flow of flows - you could go as far as having each child flow with only one task. This way, any child flow can be canceled when needed. Here is one example of what it may look like for a Docker agent. Later this year we will revisit this topic for 2.0. I wouldn't expect any changes to that being contributed in Prefect 1.0, so the flow of flows seems like an option worth giving a shot if you need that level of infrastructure control for your flows.
    Matthew Seligson

    Matthew Seligson

    5 months ago
    It would induce a large operational headache to wrap every task in a flow. We have 1000s of meaningful tasks and that number would double if for each of them we had to have a flow to create resources
    Can we access the dask client in the dask executor to send a message (like a dask event) to our running dask tasks?
    Kevin Kho

    Kevin Kho

    5 months ago
    You can use the
    worker_client
    similar to the code snippet here