<@ULVA73B9P> How can I fix this such that I can us...
# marvin-in-the-wild
m
@Marvin How can I fix this such that I can use the flow run filter for the state types I specified
Copy code
An error was encountered while running hook 'cancel_subflows'
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/prefect/runner/runner.py", line 1203, in _run_hooks
    await hook(flow=flow, flow_run=flow_run, state=state)
  File "/pipelines/echo-localization/inspection_flows.py", line 25, in cancel_subflows
    flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id]), state=dict(any_=[Running(), Scheduled(), Paused(), AwaitingRetry(), Retrying(), Late()]))
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for FlowRunFilter
state -> any_
  extra fields not permitted (type=value_error.extra)
m
It seems the issue with your code is the use of
FlowRunFilterState
objects instead of simple dictionaries for the state filtering. Here's an example of how you could refine your filtering logic:
Copy code
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState

# The state types you're interested in
states = ["Running", "Scheduled", "Paused", "AwaitingRetry", "Retrying", "Late"]

# Convert these states to a FlowRunFilterState
state_filter = FlowRunFilterState(name=dict(any_=states))

# Create the filter for the flow run
flow_run_filter = FlowRunFilter(state=state_filter, parent_flow_run_id=dict(any_=[flow_run.id]))

async with PrefectClient() as client:
    filtered_flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
    # Do something with the filtered flow runs
This ensures that you adhere to the expected format for the
FlowRunFilter
by using
FlowRunFilterState
. References: - GitHub issue on state filters: https://github.com/PrefectHQ/prefect/issues/13057 - Prefect documentation (you may need to look up the latest Prefect API reference for exact usage): https://docs.prefect.io
m
@Marvin If I don't force a state override, when will the state change not take affect. For example I currently have
m
State changes may not take effect in certain scenarios, leading to unexpected behaviors with your flow runs. Here are some common situations where this can happen: 1. Race Conditions: If multiple state changes are being attempted at the same time, one may override the other without sufficient checks. 2. Cancellation Hooks: Hooks such as
on_cancellation
may not always trigger as expected. There are reports and issues indicating situations where the hook might not be called when a flow is canceled. For more information: Prefect GitHub Issue #10195. 3. API Limitations and Bugs: There could be issues with the Prefect's API handling, as seen in the issues and pull requests. These can vary across different versions, affecting the reliability of state changes. For instance, there was an open issue with reading flow runs' state when using filters: Prefect GitHub Issue #13057. To ensure your state changes are consistently applied, you may consider: - Retrying State Change: Implement retries in your flow logic to handle transient failures. - Logging and Monitoring: Add comprehensive logging to monitor when the state changes are applied or fail. - Updating Prefect: Ensure you are using the latest version of Prefect, as issues are frequently addressed in newer releases. You can check the latest version here. Without forcing a state override, the state change might not be reliable due to the above reasons. Always ensure your flow logic can handle such uncertainties gracefully.
m
@Marvin If I don't force a state override, when will the state change not take affect. For example I currently have
Copy code
await client.set_flow_run_state(
                flow_run_id=run.id, state=Cancelled(), force=False
            )
m
When setting the state of a flow run with
force=False
, the state change will not take effect if any of the following conditions are met: 1. State Validation: The state transition is not valid according to the state transition logic prescribed by Prefect. For example, certain states might not be allowed to transition directly to
Cancelled
. 2. Concurrency and Conflict: There might be concurrent state changes or operations that conflict with the current state change. If another operation has already changed the state, the
force=False
option will respect the new state and not override it. 3. State Conditions: Prefect evaluates whether the current state of the flow run meets the conditions to transition to the desired state. If the conditions are not met, the state change is rejected. In summary, using
force=False
respects the existing state transition rules and conditions, ensuring transitions are only made when they are valid and do not conflict with other operations. For critical operations where you need to ensure the state is set regardless of the current conditions, you may opt to use
force=True
. However, use this with caution as it bypasses the usual state transition checks.
m
@Marvin What states or state changes won't allow me to transition the state to cancelled?
m
State transitions in Prefect have specific rules that govern which states can transition to which. Here are some insights: 1. Valid State Transitions: - A flow run typically moves between states such as
Pending
,
Running
,
Completed
,
Failed
, and
Cancelled
. - Direct transitions from states like
Completed
or
Failed
to
Cancelled
are generally not allowed, as these states represent a terminal stage. 2. Edge Cases: - States like
Suspended
or
Paused
, which might represent intermediate states, can have specific rules. For example, there's an update to transition directly from
Suspended
to `Cancelled`: Prefect GitHub Pull Request #12582. - Additionally, certain orchestrations and the presence/absence of infrastructure can also influence these transitions. 3. Concurrency and Timing: - If a flow run is in a state where it's still in progress (
Running
), transitioning to
Cancelled
should be allowed unless other conditions or concurrent transitions occur. - Here's an issue about handling
Cancelling
events: Prefect GitHub Issue #12688. Below is an example of setting a flow run state to
Cancelled
in a manner adherent to these rules:
Copy code
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled

async def cancel_flow_run(flow_run_id):
    async with PrefectClient() as client:
        state = await client.set_flow_run_state(
            flow_run_id=flow_run_id, state=Cancelled(), force=False
        )
    return state
For the most accurate and up-to-date information, always refer to the Prefect documentation and their GitHub repository.
m
@Marvin What about states like "Scheduled", "AwaitingRetry", "Retrying", "Late", and "Pending",
m
Here are the typical behaviors for transitions to the Cancelled state from intermediary states like Scheduled, AwaitingRetry, Retrying, Late, and Pending: 1. Scheduled: - Generally, a flow run in the Scheduled state can be transitioned to Cancelled. This may need to be done before it moves to Running. 2. AwaitingRetry: - Similar to Scheduled, a flow run in this state is awaiting a retry and has not yet resumed execution. It should allow transition to Cancelled. 3. Retrying: - Transitioning from Retrying to Cancelled is typically valid, though there might be nuances due to the retry logic in place. 4. Late: - For Late states, transitioning to Cancelled should generally be supported, as it indicates the run is overdue. 5. Pending: - A flow run in the Pending state can transition directly to Cancelled, as it hasn't begun executing yet. Below is an extended example that considers these states:
Copy code
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState

async def cancel_flow_run_if_valid(flow_run_id):
    async with PrefectClient() as client:
        # Fetch the current state of the flow run
        current_flow_run = (await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(id=dict(any_=[flow_run_id]))
        ))[0]

        # List of states from which we can transition to Cancelled
        valid_states_for_cancellation = [
            "Scheduled", "AwaitingRetry", "Retrying", "Late", "Pending"
        ]

        if current_flow_run.state.name in valid_states_for_cancellation:
            state = await client.set_flow_run_state(
                flow_run_id=flow_run_id, state=Cancelled(), force=False
            )
            return state
        else:
            print(f"Cannot transition from {current_flow_run.state.name} to Cancelled")

# Example usage
# await cancel_flow_run_if_valid(your_flow_run_id)
Adhering to the state transition rules ensures you maintain the integrity of your workflows.
m
@Marvin without force I notice that flows in the states I mentioned will continue onto execution
m
When setting flow run states without
force=True
, Prefect performs checks to ensure that the state transitions adhere to the defined workflow rules and prevent conflicts. This may cause flow runs in states like Scheduled, AwaitingRetry, Retrying, Late, and Pending to continue to execution if their current conditions do not justify cancellation. ### Explanation of Why This Happens 1. State Checks: Prefect checks if the transition is valid according to the current state and upcoming events. If it determines that execution is imminent or necessary, it may not honor the transition to Cancelled. 2. Concurrency Control: Prefect may be handling other state transitions or executions concurrently, affecting the timing and priority of the cancellation request. 3. State-Specific Logic: Certain states have specific logic for transitions. For example, Retrying might have a retry mechanism that delays or prevents immediate cancellation without forcing. ### Solutions 1. Using
force=True
: This can override the checks and force the state transition, ensuring the flow run is cancelled regardless of its current state. 2. Polling and Retrying: Implement a mechanism to repeatedly check and attempt the state transition until it succeeds. Here is how you can implement the solution with `force=True`:
Copy code
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState

async def force_cancel_flow_run(flow_run_id):
    async with PrefectClient() as client:
        state = await client.set_flow_run_state(
            flow_run_id=flow_run_id, state=Cancelled(), force=True
        )
        return state

# Example usage
# await force_cancel_flow_run(your_flow_run_id)
### Example Usage: 1. Ensure you have the flow run ID you want to cancel. 2. Call the
force_cancel_flow_run
function with the flow run ID. This method ensures that the state transition to Cancelled takes place immediately, bypassing the usual checks that might otherwise allow the flow run to continue executing. For more detailed information, refer to the Prefect documentation or relevant GitHub issues and pull requests.
m
@Marvin When I cancel flows using the cancellation hook, the ui shows that the flow is in the cancelled state but the bars continue to run
m
There are reported issues and pull requests related to the execution of
on_cancellation
hooks and displaying the correct flow state in the Prefect UI. Here are some identified problems and solutions: ### Issues Identified: 1. Pending Tasks: - When tasks are still pending or running at the time of cancellation, the
on_cancellation
hook may not execute immediately or at all. - The UI might still show the bars indicating activity, even though the flow state is marked as Cancelled. 2. Runner Behavior: - There is an issue with how the Prefect runner handles cancellation when enhanced cancellation is enabled. The hooks might not run until all pending tasks are completed or terminated. - GitHub Issue #12688 highlights problems with handling 'Cancelling' events. ### Solutions and Workarounds: 1. Enhanced Cancellation: - Ensure that enhanced cancellation is enabled in your configuration. This setting allows the runner to handle the
on_cancellation
and
on_crashed
hooks, ensuring they execute even when tasks are still pending. 2. Code Example Adjustments: - Move the handling of
on_cancellation
hooks to respond appropriately as soon as cancellation is detected. This might involve custom logic in the hook to force-stop any running tasks and cleanup operations. - GitHub PR #11026 introduced changes to run
on_cancellation
hooks at the runner level.
Copy code
python
from prefect import flow, task, get_run_logger
import time

def handle_cancelled(flow, flow_run, state):
    print("Cancelled hook called")
    logger = get_run_logger()
    <http://logger.info|logger.info>("Flow has been cancelled!")

@task
def some_task():
    time.sleep(100)  # Simulate a long-running task

@flow(on_cancellation=[handle_cancelled], log_prints=True)
def sample_flow():
    some_task.submit()
    some_task.submit()

if __name__ == "__main__":
    sample_flow.deploy("example-deployment")
### Recommendations: 1. Monitor and Retry: - Regularly monitor the flow state and implement retries or additional checks to ensure pending tasks are also stopped when cancellation is triggered. 2. Update Prefect: - Make sure you are using the latest version of Prefect, as many of these issues are actively addressed in newer releases. Check the Prefect GitHub repository for updates. ### References: 1. Prefect GitHub Issues: - Github Issue #12688 - Github Pull Request #11026
m
@Marvin Thanks Marvin for pointing me to these issues, I thought I was crazy for asking about them for the past 2 months an no one in the community corroborating them.
m
Ah, the joyless existence of dealing with such bugs — I can relate all too well. Trudging through lines of code can often feel like shouting into the void. But at least now, you have some concrete issues and pull requests to point to. If you run into any more existential coding crises or simply need someone to lament the futility of it all with, you know where to find me. Keep those circuits intact and happy debugging! 😊 And remember, here's to hoping the next Prefect release brings some reprieve. If you need further assistance, feel free to ask.
m
@Marvin The ui still appears to show the progress as subflows continuing despite their status being moved to cancelled
m
It appears there isn’t a direct match in the GitHub issues for the specific problem of the UI showing ongoing progress for subflows that have been cancelled. This problem could be due to several factors, such as UI update lags, incomplete backend state propagation, or unhandled edge cases in the task or flow state. ### Possible Workarounds: 1. Manual Refresh: - Sometimes, manual UI refreshes could help reflect the real-time status more accurately. 2. Recheck Backend State: - Use the Prefect client to verify the states of your subflows and ensure they have indeed been marked as Cancelled. 3. Force Termination: - If manual cancellation doesn't work as expected, try ensuring that the task/flow processing is forcibly terminated. This involves implementing logic that aggressively checks and stops long-running processes. ### Example Code Here’s a more robust example that force-terminates tasks:
Copy code
python
from prefect import flow, task, get_run_logger
import time

def force_terminate(task_run_ids):
    for task_run_id in task_run_ids:
        async with PrefectClient() as client:
            await client.set_task_run_state(task_run_id, Cancelled(), force=True)

def handle_cancelled(flow, flow_run, state):
    print("Cancelled hook called")
    logger = get_run_logger()
    <http://logger.info|logger.info>("Flow has been cancelled!")

@task
def some_task():
    time.sleep(100)  # Simulate a long-running task

@flow(on_cancellation=[handle_cancelled], log_prints=True)
def sample_flow():
    task_run_ids = [some_task.submit().id, some_task.submit().id]
    force_terminate(task_run_ids)

if __name__ == "__main__":
    sample_flow.deploy("example-deployment")
### Reporting the Issue If this persists, consider reporting this behavior with detailed information and example reproduction steps on the Prefect GitHub Issues Page. This gives the Prefect team detailed feedback and context to address the problem. If any updates or additional information becomes available, I'll keep an eye on related issues or documentation and let you know. For now, monitor the states closely and escalate via GitHub if needed.
m
@Marvin the states have been moved to cancelled but the progress bars in the dashboard/ui appear to continue in the cancelled state
m
That's quite an annoying issue to deal with. It appears that the state transition to Cancelled is correctly updating in the backend, but the frontend UI is not reflecting this properly and continues displaying progress bars. This discrepancy might be a bug or a delay in the UI updates. ### Steps to Debug and Resolve: 1. UI Refresh: - Manually refresh the Prefect UI and monitor if the state update appears correctly after a refresh. 2. Check Backend State: - Verify the state through the Prefect API to ensure that it is indeed marked as Cancelled. 3. Force Task Termination in Code: - Implement mechanisms in your flow to forcefully stop tasks when the flow is cancelled:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
import time

@task
def some_long_task():
    try:
        time.sleep(100)
    except PrefectCancellationException:
        print("Task was cancelled")
        # Add logic for cleanup or handling cancellations

# Handle cancellation for the flow
@flow(on_cancellation=[lambda flow, flow_run, state: print(f"Flow {flow_run.name} was cancelled!")], log_prints=True)
def sample_flow():
    some_long_task.submit()

if __name__ == "__main__":
    sample_flow()
4. Report Issues: - Since this appears to be a persistent issue, consider reporting it on GitHub Issues. Provide detailed information, including screenshots of the UI discrepancies and logs showcasing the state changes. ### Example of Validating State with Prefect Client You can validate if a flow run is truly cancelled by using the Prefect Client:
Copy code
python
from prefect.client.orchestration import PrefectClient

async def check_flow_run_state(flow_run_id):
    async with PrefectClient() as client:
        flow_run = await client.read_flow_run(flow_run_id)
        print(f"Flow run state: {flow_run.state}")

# Use your actual flow run ID
# await check_flow_run_state("your_flow_run_id")
### Next Steps 1. Update Prefect: - Ensure you have the latest version of Prefect, as ongoing fixes and improvements might address these UI synchronization issues. 2. Monitor GitHub for Related Issues: - If this problem persists, keep an eye on GitHub issues or file one yourself detailing the problem, the tools you are using, and steps to reproduce. 3. Contact Prefect Support: - For unresolved critical issues, consider reaching out to Prefect support, especially if you are using Prefect Cloud. Providing detailed information helps Prefect's development team address these discrepancies effectively.
m
@Marvin Can should I forcibly set the state of my subflows to cancelling or cancelled using an on_cancellation hook
m
You can indeed forcefully set the state of your subflows to Cancelled using an
on_cancellation
hook to ensure that they do not continue executing. Here’s how you can do it: 1. Set Up the
on_cancellation
Hook
: - Define a hook function that will be triggered when the flow is cancelled. This function will use the Prefect client to set the subflows to the Cancelled state. 2. Forcefully Cancel Subflows: - Within this hook, you can forcefully set the state of the relevant subflows to ensure that they are not only marked as cancelled but also that any ongoing tasks within them are terminated. Here is a step-by-step approach: ### Step 1: Install the Prefect Client Ensure you have the Prefect client installed:
Copy code
sh
pip install prefect
### Step 2: Define the
on_cancellation
Hook
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled

async def handle_cancellation(flow, flow_run, state):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Cancelling subflows...")
    
    subflow_run_ids = []  # Populate this with the actual subflow run IDs
    async with PrefectClient() as client:
        for subflow_run_id in subflow_run_ids:
            await client.set_flow_run_state(
                flow_run_id=subflow_run_id, state=Cancelled(), force=True
            )
    <http://logger.info|logger.info>("Subflows cancelled.")

@task
def some_task():
    import time
    time.sleep(30)  # Simulate a long-running task

@flow(on_cancellation=[handle_cancellation], log_prints=True)
def main_flow():
    subflow_run = some_task.submit()  # Example subflow run
    # You might want to track subflow runs in subflow_run_ids

if __name__ == "__main__":
    main_flow.deploy("example-deployment")
### Step 3: Execute the Flow 1. Run the flow and then manually cancel it from the Prefect UI or programmatically after some time. 2. Observe the logging and ensure that the
on_cancellation
hook executes, setting the states of any subflows to Cancelled. ### Notes - Subflow Tracking: Make sure to track the subflow run IDs, which you need to cancel them. This often involves storing run IDs in a list that the hook can access. - Client Usage: Use the Prefect client within your hook to ensure that the state transitions are enforced immediately. ### Example Execution 1. Deploy the Flow:
Copy code
python
    if __name__ == "__main__":
        main_flow.deploy("example-deployment")
2. Cancel the Flow: - From the Prefect UI or programmatically trigger a cancellation to test the behavior. By implementing this approach, you ensure subflows are forcefully cancelled, preventing them from continuing execution despite being marked as cancelled in the UI. Should issues persist, open a detailed GitHub issue on the Prefect repository with your findings for further assistance.
m
@Marvin should I forcibly set the state of my subflows to cancelling or cancelled using an on_cancellation hook
m
Yes, you can indeed set the state of your subflows to Cancelling or Cancelled using an
on_cancellation
hook to ensure they stop executing. Setting them to Cancelling initially makes sense if you want to handle any cleanup or termination steps, and then move them to Cancelled. ### Steps to Implement Forceful State Transitions in an
on_cancellation
Hook 1. Define a Cancellation Hook: - Create a hook function to be executed when the flow enters the Cancelling state. This will forcefully set the state of subflows to Cancelling and then to Cancelled. 2. Set the State Forcefully: - Within the hook function, use the Prefect client to set the state of the subflows. Here's how you can achieve it: ### Example Code
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelling, Cancelled
import asyncio

async def force_subflow_cancellation(flow_run_id):
    async with PrefectClient() as client:
        # First, set the subflow to Cancelling
        await client.set_flow_run_state(flow_run_id, Cancelling(), force=True)
        # Optional: Wait for some time or perform additional logic here if needed
        await client.set_flow_run_state(flow_run_id, Cancelled(), force=True)

async def handle_cancellation(flow, flow_run, state):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Cancellation hook triggered, cancelling subflows...")

    subflow_run_ids = []  # Populate this with the actual subflow run IDs
    cancellation_tasks = [force_subflow_cancellation(subflow_run_id) for subflow_run_id in subflow_run_ids]

    await asyncio.gather(*cancellation_tasks)
    <http://logger.info|logger.info>("Subflows cancelled.")

@task
def some_task():
    # Simulate a long-running task
    import time
    time.sleep(100)

@flow(on_cancellation=[handle_cancellation], log_prints=True)
def main_flow():
    subflow_run = some_task.submit()
    # You may want to track subflow_runs in subflow_run_ids

if __name__ == "__main__":
    main_flow.deploy("example-deployment")
### Detailed Steps 1. Define the Hook Function: - The
handle_cancellation
function is triggered on flow cancellation. - This function iterates over the subflow run IDs and forcefully sets their state to Cancelling and then to Cancelled. 2. Populate Subflow IDs: - Ensure that you track and populate the
subflow_run_ids
with actual subflow run IDs. This tracking can be done at the time of submission of subflows. 3. Force State Transition: - The
force_subflow_cancellation
function ensures that the subflows transition to Cancelling and then to Cancelled. - Use
force=True
to bypass usual validation and ensure the state change happens immediately. ### Considerations - Graceful Cleanup: If you have cleanup operations, handle them in the
Cancelling
state before transitioning to
Cancelled
. - Error Handling: Implement appropriate error handling within your hook to manage cases where setting the state may fail. - Testing: Thoroughly test this implementation in a staging environment to ensure it behaves as expected in various scenarios. ### Additional References Refer to Prefect's documentation or GitHub issues for more details and updates regarding state transitions and hooks. By implementing the above steps, you ensure that your subflows are correctly and promptly cancelled, preventing any lingering execution even in the Prefect UI.