Mitch
07/05/2024, 9:10 PMAn error was encountered while running hook 'cancel_subflows'
Traceback (most recent call last):
File "/usr/local/lib/python3.11/dist-packages/prefect/runner/runner.py", line 1203, in _run_hooks
await hook(flow=flow, flow_run=flow_run, state=state)
File "/pipelines/echo-localization/inspection_flows.py", line 25, in cancel_subflows
flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id]), state=dict(any_=[Running(), Scheduled(), Paused(), AwaitingRetry(), Retrying(), Late()]))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "pydantic/main.py", line 341, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for FlowRunFilter
state -> any_
extra fields not permitted (type=value_error.extra)
Marvin
07/05/2024, 9:10 PMFlowRunFilterState
objects instead of simple dictionaries for the state filtering. Here's an example of how you could refine your filtering logic:
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState
# The state types you're interested in
states = ["Running", "Scheduled", "Paused", "AwaitingRetry", "Retrying", "Late"]
# Convert these states to a FlowRunFilterState
state_filter = FlowRunFilterState(name=dict(any_=states))
# Create the filter for the flow run
flow_run_filter = FlowRunFilter(state=state_filter, parent_flow_run_id=dict(any_=[flow_run.id]))
async with PrefectClient() as client:
filtered_flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
# Do something with the filtered flow runs
This ensures that you adhere to the expected format for the FlowRunFilter
by using FlowRunFilterState
.
References:
- GitHub issue on state filters: https://github.com/PrefectHQ/prefect/issues/13057
- Prefect documentation (you may need to look up the latest Prefect API reference for exact usage): https://docs.prefect.ioMitch
07/05/2024, 9:24 PMMarvin
07/05/2024, 9:24 PMon_cancellation
may not always trigger as expected. There are reports and issues indicating situations where the hook might not be called when a flow is canceled. For more information: Prefect GitHub Issue #10195.
3. API Limitations and Bugs: There could be issues with the Prefect's API handling, as seen in the issues and pull requests. These can vary across different versions, affecting the reliability of state changes. For instance, there was an open issue with reading flow runs' state when using filters: Prefect GitHub Issue #13057.
To ensure your state changes are consistently applied, you may consider:
- Retrying State Change: Implement retries in your flow logic to handle transient failures.
- Logging and Monitoring: Add comprehensive logging to monitor when the state changes are applied or fail.
- Updating Prefect: Ensure you are using the latest version of Prefect, as issues are frequently addressed in newer releases. You can check the latest version here.
Without forcing a state override, the state change might not be reliable due to the above reasons. Always ensure your flow logic can handle such uncertainties gracefully.Mitch
07/05/2024, 9:24 PMawait client.set_flow_run_state(
flow_run_id=run.id, state=Cancelled(), force=False
)
Marvin
07/05/2024, 9:25 PMforce=False
, the state change will not take effect if any of the following conditions are met:
1. State Validation: The state transition is not valid according to the state transition logic prescribed by Prefect. For example, certain states might not be allowed to transition directly to Cancelled
.
2. Concurrency and Conflict: There might be concurrent state changes or operations that conflict with the current state change. If another operation has already changed the state, the force=False
option will respect the new state and not override it.
3. State Conditions: Prefect evaluates whether the current state of the flow run meets the conditions to transition to the desired state. If the conditions are not met, the state change is rejected.
In summary, using force=False
respects the existing state transition rules and conditions, ensuring transitions are only made when they are valid and do not conflict with other operations.
For critical operations where you need to ensure the state is set regardless of the current conditions, you may opt to use force=True
. However, use this with caution as it bypasses the usual state transition checks.Mitch
07/05/2024, 9:26 PMMarvin
07/05/2024, 9:26 PMPending
, Running
, Completed
, Failed
, and Cancelled
.
- Direct transitions from states like Completed
or Failed
to Cancelled
are generally not allowed, as these states represent a terminal stage.
2. Edge Cases:
- States like Suspended
or Paused
, which might represent intermediate states, can have specific rules. For example, there's an update to transition directly from Suspended
to `Cancelled`: Prefect GitHub Pull Request #12582.
- Additionally, certain orchestrations and the presence/absence of infrastructure can also influence these transitions.
3. Concurrency and Timing:
- If a flow run is in a state where it's still in progress (Running
), transitioning to Cancelled
should be allowed unless other conditions or concurrent transitions occur.
- Here's an issue about handling Cancelling
events: Prefect GitHub Issue #12688.
Below is an example of setting a flow run state to Cancelled
in a manner adherent to these rules:
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
async def cancel_flow_run(flow_run_id):
async with PrefectClient() as client:
state = await client.set_flow_run_state(
flow_run_id=flow_run_id, state=Cancelled(), force=False
)
return state
For the most accurate and up-to-date information, always refer to the Prefect documentation and their GitHub repository.Mitch
07/05/2024, 9:28 PMMarvin
07/05/2024, 9:28 PMpython
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState
async def cancel_flow_run_if_valid(flow_run_id):
async with PrefectClient() as client:
# Fetch the current state of the flow run
current_flow_run = (await client.read_flow_runs(
flow_run_filter=FlowRunFilter(id=dict(any_=[flow_run_id]))
))[0]
# List of states from which we can transition to Cancelled
valid_states_for_cancellation = [
"Scheduled", "AwaitingRetry", "Retrying", "Late", "Pending"
]
if current_flow_run.state.name in valid_states_for_cancellation:
state = await client.set_flow_run_state(
flow_run_id=flow_run_id, state=Cancelled(), force=False
)
return state
else:
print(f"Cannot transition from {current_flow_run.state.name} to Cancelled")
# Example usage
# await cancel_flow_run_if_valid(your_flow_run_id)
Adhering to the state transition rules ensures you maintain the integrity of your workflows.Mitch
07/05/2024, 9:29 PMMarvin
07/05/2024, 9:29 PMforce=True
, Prefect performs checks to ensure that the state transitions adhere to the defined workflow rules and prevent conflicts. This may cause flow runs in states like Scheduled, AwaitingRetry, Retrying, Late, and Pending to continue to execution if their current conditions do not justify cancellation.
### Explanation of Why This Happens
1. State Checks: Prefect checks if the transition is valid according to the current state and upcoming events. If it determines that execution is imminent or necessary, it may not honor the transition to Cancelled.
2. Concurrency Control: Prefect may be handling other state transitions or executions concurrently, affecting the timing and priority of the cancellation request.
3. State-Specific Logic: Certain states have specific logic for transitions. For example, Retrying might have a retry mechanism that delays or prevents immediate cancellation without forcing.
### Solutions
1. Using force=True
: This can override the checks and force the state transition, ensuring the flow run is cancelled regardless of its current state.
2. Polling and Retrying: Implement a mechanism to repeatedly check and attempt the state transition until it succeeds.
Here is how you can implement the solution with `force=True`:
python
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState
async def force_cancel_flow_run(flow_run_id):
async with PrefectClient() as client:
state = await client.set_flow_run_state(
flow_run_id=flow_run_id, state=Cancelled(), force=True
)
return state
# Example usage
# await force_cancel_flow_run(your_flow_run_id)
### Example Usage:
1. Ensure you have the flow run ID you want to cancel.
2. Call the force_cancel_flow_run
function with the flow run ID.
This method ensures that the state transition to Cancelled takes place immediately, bypassing the usual checks that might otherwise allow the flow run to continue executing.
For more detailed information, refer to the Prefect documentation or relevant GitHub issues and pull requests.Mitch
07/05/2024, 9:33 PMMarvin
07/05/2024, 9:33 PMon_cancellation
hooks and displaying the correct flow state in the Prefect UI. Here are some identified problems and solutions:
### Issues Identified:
1. Pending Tasks:
- When tasks are still pending or running at the time of cancellation, the on_cancellation
hook may not execute immediately or at all.
- The UI might still show the bars indicating activity, even though the flow state is marked as Cancelled.
2. Runner Behavior:
- There is an issue with how the Prefect runner handles cancellation when enhanced cancellation is enabled. The hooks might not run until all pending tasks are completed or terminated.
- GitHub Issue #12688 highlights problems with handling 'Cancelling' events.
### Solutions and Workarounds:
1. Enhanced Cancellation:
- Ensure that enhanced cancellation is enabled in your configuration. This setting allows the runner to handle the on_cancellation
and on_crashed
hooks, ensuring they execute even when tasks are still pending.
2. Code Example Adjustments:
- Move the handling of on_cancellation
hooks to respond appropriately as soon as cancellation is detected. This might involve custom logic in the hook to force-stop any running tasks and cleanup operations.
- GitHub PR #11026 introduced changes to run on_cancellation
hooks at the runner level.
python
from prefect import flow, task, get_run_logger
import time
def handle_cancelled(flow, flow_run, state):
print("Cancelled hook called")
logger = get_run_logger()
<http://logger.info|logger.info>("Flow has been cancelled!")
@task
def some_task():
time.sleep(100) # Simulate a long-running task
@flow(on_cancellation=[handle_cancelled], log_prints=True)
def sample_flow():
some_task.submit()
some_task.submit()
if __name__ == "__main__":
sample_flow.deploy("example-deployment")
### Recommendations:
1. Monitor and Retry:
- Regularly monitor the flow state and implement retries or additional checks to ensure pending tasks are also stopped when cancellation is triggered.
2. Update Prefect:
- Make sure you are using the latest version of Prefect, as many of these issues are actively addressed in newer releases. Check the Prefect GitHub repository for updates.
### References:
1. Prefect GitHub Issues:
- Github Issue #12688
- Github Pull Request #11026Mitch
07/05/2024, 9:41 PMMarvin
07/05/2024, 9:41 PMMitch
07/05/2024, 9:41 PMMarvin
07/05/2024, 9:42 PMpython
from prefect import flow, task, get_run_logger
import time
def force_terminate(task_run_ids):
for task_run_id in task_run_ids:
async with PrefectClient() as client:
await client.set_task_run_state(task_run_id, Cancelled(), force=True)
def handle_cancelled(flow, flow_run, state):
print("Cancelled hook called")
logger = get_run_logger()
<http://logger.info|logger.info>("Flow has been cancelled!")
@task
def some_task():
time.sleep(100) # Simulate a long-running task
@flow(on_cancellation=[handle_cancelled], log_prints=True)
def sample_flow():
task_run_ids = [some_task.submit().id, some_task.submit().id]
force_terminate(task_run_ids)
if __name__ == "__main__":
sample_flow.deploy("example-deployment")
### Reporting the Issue
If this persists, consider reporting this behavior with detailed information and example reproduction steps on the Prefect GitHub Issues Page. This gives the Prefect team detailed feedback and context to address the problem.
If any updates or additional information becomes available, I'll keep an eye on related issues or documentation and let you know. For now, monitor the states closely and escalate via GitHub if needed.Mitch
07/05/2024, 9:43 PMMarvin
07/05/2024, 9:43 PMpython
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
import time
@task
def some_long_task():
try:
time.sleep(100)
except PrefectCancellationException:
print("Task was cancelled")
# Add logic for cleanup or handling cancellations
# Handle cancellation for the flow
@flow(on_cancellation=[lambda flow, flow_run, state: print(f"Flow {flow_run.name} was cancelled!")], log_prints=True)
def sample_flow():
some_long_task.submit()
if __name__ == "__main__":
sample_flow()
4. Report Issues:
- Since this appears to be a persistent issue, consider reporting it on GitHub Issues. Provide detailed information, including screenshots of the UI discrepancies and logs showcasing the state changes.
### Example of Validating State with Prefect Client
You can validate if a flow run is truly cancelled by using the Prefect Client:
python
from prefect.client.orchestration import PrefectClient
async def check_flow_run_state(flow_run_id):
async with PrefectClient() as client:
flow_run = await client.read_flow_run(flow_run_id)
print(f"Flow run state: {flow_run.state}")
# Use your actual flow run ID
# await check_flow_run_state("your_flow_run_id")
### Next Steps
1. Update Prefect:
- Ensure you have the latest version of Prefect, as ongoing fixes and improvements might address these UI synchronization issues.
2. Monitor GitHub for Related Issues:
- If this problem persists, keep an eye on GitHub issues or file one yourself detailing the problem, the tools you are using, and steps to reproduce.
3. Contact Prefect Support:
- For unresolved critical issues, consider reaching out to Prefect support, especially if you are using Prefect Cloud.
Providing detailed information helps Prefect's development team address these discrepancies effectively.Mitch
07/05/2024, 9:53 PMMarvin
07/05/2024, 9:54 PMon_cancellation
hook to ensure that they do not continue executing. Here’s how you can do it:
1. Set Up the on_cancellation
Hook:
- Define a hook function that will be triggered when the flow is cancelled. This function will use the Prefect client to set the subflows to the Cancelled state.
2. Forcefully Cancel Subflows:
- Within this hook, you can forcefully set the state of the relevant subflows to ensure that they are not only marked as cancelled but also that any ongoing tasks within them are terminated.
Here is a step-by-step approach:
### Step 1: Install the Prefect Client
Ensure you have the Prefect client installed:
sh
pip install prefect
### Step 2: Define the on_cancellation
Hook
python
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelled
async def handle_cancellation(flow, flow_run, state):
logger = get_run_logger()
<http://logger.info|logger.info>("Cancelling subflows...")
subflow_run_ids = [] # Populate this with the actual subflow run IDs
async with PrefectClient() as client:
for subflow_run_id in subflow_run_ids:
await client.set_flow_run_state(
flow_run_id=subflow_run_id, state=Cancelled(), force=True
)
<http://logger.info|logger.info>("Subflows cancelled.")
@task
def some_task():
import time
time.sleep(30) # Simulate a long-running task
@flow(on_cancellation=[handle_cancellation], log_prints=True)
def main_flow():
subflow_run = some_task.submit() # Example subflow run
# You might want to track subflow runs in subflow_run_ids
if __name__ == "__main__":
main_flow.deploy("example-deployment")
### Step 3: Execute the Flow
1. Run the flow and then manually cancel it from the Prefect UI or programmatically after some time.
2. Observe the logging and ensure that the on_cancellation
hook executes, setting the states of any subflows to Cancelled.
### Notes
- Subflow Tracking: Make sure to track the subflow run IDs, which you need to cancel them. This often involves storing run IDs in a list that the hook can access.
- Client Usage: Use the Prefect client within your hook to ensure that the state transitions are enforced immediately.
### Example Execution
1. Deploy the Flow:
python
if __name__ == "__main__":
main_flow.deploy("example-deployment")
2. Cancel the Flow:
- From the Prefect UI or programmatically trigger a cancellation to test the behavior.
By implementing this approach, you ensure subflows are forcefully cancelled, preventing them from continuing execution despite being marked as cancelled in the UI.
Should issues persist, open a detailed GitHub issue on the Prefect repository with your findings for further assistance.Mitch
07/05/2024, 9:55 PMMarvin
07/05/2024, 9:56 PMon_cancellation
hook to ensure they stop executing. Setting them to Cancelling initially makes sense if you want to handle any cleanup or termination steps, and then move them to Cancelled.
### Steps to Implement Forceful State Transitions in an on_cancellation
Hook
1. Define a Cancellation Hook:
- Create a hook function to be executed when the flow enters the Cancelling state. This will forcefully set the state of subflows to Cancelling and then to Cancelled.
2. Set the State Forcefully:
- Within the hook function, use the Prefect client to set the state of the subflows.
Here's how you can achieve it:
### Example Code
python
from prefect import flow, task, get_run_logger
from prefect.client.orchestration import PrefectClient
from prefect.server.schemas.states import Cancelling, Cancelled
import asyncio
async def force_subflow_cancellation(flow_run_id):
async with PrefectClient() as client:
# First, set the subflow to Cancelling
await client.set_flow_run_state(flow_run_id, Cancelling(), force=True)
# Optional: Wait for some time or perform additional logic here if needed
await client.set_flow_run_state(flow_run_id, Cancelled(), force=True)
async def handle_cancellation(flow, flow_run, state):
logger = get_run_logger()
<http://logger.info|logger.info>("Cancellation hook triggered, cancelling subflows...")
subflow_run_ids = [] # Populate this with the actual subflow run IDs
cancellation_tasks = [force_subflow_cancellation(subflow_run_id) for subflow_run_id in subflow_run_ids]
await asyncio.gather(*cancellation_tasks)
<http://logger.info|logger.info>("Subflows cancelled.")
@task
def some_task():
# Simulate a long-running task
import time
time.sleep(100)
@flow(on_cancellation=[handle_cancellation], log_prints=True)
def main_flow():
subflow_run = some_task.submit()
# You may want to track subflow_runs in subflow_run_ids
if __name__ == "__main__":
main_flow.deploy("example-deployment")
### Detailed Steps
1. Define the Hook Function:
- The handle_cancellation
function is triggered on flow cancellation.
- This function iterates over the subflow run IDs and forcefully sets their state to Cancelling and then to Cancelled.
2. Populate Subflow IDs:
- Ensure that you track and populate the subflow_run_ids
with actual subflow run IDs. This tracking can be done at the time of submission of subflows.
3. Force State Transition:
- The force_subflow_cancellation
function ensures that the subflows transition to Cancelling and then to Cancelled.
- Use force=True
to bypass usual validation and ensure the state change happens immediately.
### Considerations
- Graceful Cleanup: If you have cleanup operations, handle them in the Cancelling
state before transitioning to Cancelled
.
- Error Handling: Implement appropriate error handling within your hook to manage cases where setting the state may fail.
- Testing: Thoroughly test this implementation in a staging environment to ensure it behaves as expected in various scenarios.
### Additional References
Refer to Prefect's documentation or GitHub issues for more details and updates regarding state transitions and hooks.
By implementing the above steps, you ensure that your subflows are correctly and promptly cancelled, preventing any lingering execution even in the Prefect UI.