William Fitzmaurice
06/18/2024, 12:04 AMMarvin
06/18/2024, 12:04 AMpause_flow_run
function is invoked directly within the code.
### Steps to Reproduce (Referenced in Issue)
1. Setup a Minimal Reproducible Example:
python
from prefect import flow, task, get_run_logger
import time
@task
def append_1(num: str):
time.sleep(6)
return num + "1"
@task
def append_2(num: str):
time.sleep(6)
return num + "2"
@task
def append_num(num: str, append_with: str):
time.sleep(6)
return num + append_with
@task
def convert_str_to_int(num: str):
time.sleep(6)
return int(num)
from prefect.engine import pause_flow_run
@flow()
def minimal_reproducible_example(num: str):
x = append_1.submit(num)
y = append_2.submit(x.result())
z = append_num.submit(y.result(), "12")
logger = get_run_logger()
<http://logger.info|logger.info>(f"First result: {convert_str_to_int(x.result())}")
<http://logger.info|logger.info>(f"Second result: {convert_str_to_int(y.result())}")
<http://logger.info|logger.info>(f"Third result: {convert_str_to_int(z.result())}")
2. Pause Using UI vs Function:
- Pausing via the UI can lead to the flow terminating instead of pausing.
- Directly using pause_flow_run(timeout=30)
within the flow code does not exhibit this issue.
### Workaround
- Invoke pause_flow_run
directly within your flow code rather than using the UI to avoid this issue.
### Related Issues
- Issue #8360: Discusses ensuring tasks can rerun correctly upon resuming a flow when results are not persisted, which may also be relevant to your use case.
### Further Reading
- Prefect Docs on Flow for more on flow management and the pause_flow_run
function.
You might need to adjust your workflow and handle pausing programmatically until this bug is resolved in the UI.
If you need real-time updates, keep an eye on the referenced GitHub issues or subscribe to notifications for their progress.William Fitzmaurice
06/18/2024, 12:07 AMMarvin
06/18/2024, 12:08 AMCompleted
state instead of a Suspended
state—might be a bug. Based on the search results and available resources, here are some points and a potential workaround:
### Points to Note:
1. Flow Suspension and Resumption Mechanics:
- A flow run can be suspended by calling the suspend_flow_run
function or using the Prefect UI (source).
- Resuming can be done via the UI or programmatically using resume_flow_run(FLOW_RUN_ID)
.
2. Known Issues:
- There are known issues where resuming a flow might lead to unexpected states, as mentioned in various GitHub issues. Unfortunately, a direct issue matching your exact criteria wasn't found, but Issue #9269 could be relevant as it discusses incorrect behavior on suspending flows through the UI.
### Workaround Approach:
While waiting for a potential fix, you can implement a workaround in your code to handle the state checking more explicitly. Here is a hypothetical example of how you might code around this:
python
from prefect import flow, task, get_client
from prefect.states import Suspended
@task
def check_condition():
# Implement your condition-checking logic here
# Return True if condition met, False otherwise
return True
async def suspend_flow_run_if_needed(flow_run_id):
# Check condition
condition_met = check_condition()
if condition_met:
# Suspend the flow run
client = get_client()
await client.update_flow_run(flow_run_id, state=Suspended())
return True
return False
@flow
async def main_flow():
flow_run_id = await get_client().get_run_context().flow_run.id
# Resume the flow run
if not await suspend_flow_run_if_needed(flow_run_id):
# Continue executing your flow logic if not suspended
...
if __name__ == "__main__":
main_flow()
### Example Code:
Here’s a link to an additional code example utilizing Prefect flow for reference:
- Basic Flow Example
### Following Up:
1. GitHub Issues:
- Monitor Prefect’s GitHub repository for new issues or updates related to flow suspension and state handling.
- Consider reporting this behavior if it persists, including your specific use case and code snippets to better illustrate the issue.
2. Documentation:
- Refer to Prefect's official documentation on flows for the latest updates.
This workaround programmatically checks and sets the state to Suspended
, which may bypass the issue of incorrect state transitions.William Fitzmaurice
06/18/2024, 12:26 AMAn optional key to prevent calling suspend more than once. This
defaults to a random string and prevents suspends from running the
same suspend twice. A custom key can be supplied for custom
suspending behavior.
And it is used in this manner:
pause_counter = _observed_flow_pauses(context)
pause_key = key or str(pause_counter)
proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)
I suspect the key might be the same in the second suspension which is why it isn't working. Is that correct?Marvin
06/18/2024, 12:26 AMkey
parameter potentially causing the issue is plausible. The key
parameter in the suspend_flow_run
function is indeed used to prevent the same suspend from being called multiple times.
In the Prefect documentation, the key works like this:
python
pause_counter = _observed_flow_pauses(context)
pause_key = key or str(pause_counter)
proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)
By default, the pause_key
is set to a string representation of the pause_counter
, which increments each time a pause attempt is observed. If the key
parameter isn't specified uniquely for each suspend attempt, the same key will be reused, causing the suspend action to potentially be ignored if it matches a previously used key.
### Recommendation:
To ensure that each suspension within a flow run generates a unique key, you can provide a custom key or make sure the automatic keys generated are distinct.
### Example:
Here's an example illustrating how to handle custom keys:
python
from prefect import flow, task
from prefect.client import get_client
import uuid
@task
def check_condition():
# Implement condition-checking logic here
# Return True if condition met, False otherwise
return True
async def suspend_flow_run_if_needed(flow_run_id, custom_key=None):
condition_met = check_condition()
if condition_met:
suspend_key = custom_key or str(uuid.uuid4()) # Generate a unique key if not provided
client = get_client()
await client.update_flow_run(flow_run_id, state=Suspended(pause_key=suspend_key))
return True
return False
@flow
async def main_flow():
flow_run_id = await get_client().get_run_context().flow_run.id
# First suspension attempt
if not await suspend_flow_run_if_needed(flow_run_id, custom_key="first_suspend"):
...
# After resumption, check and suspend again if needed
if not await suspend_flow_run_if_needed(flow_run_id, custom_key="second_suspend"):
...
if __name__ == "__main__":
main_flow()
### Following Up:
- Monitor GitHub Issues: Keep an eye on Prefect's GitHub issues for related bug reports and updates.
- Documentation: Continue to refer to Prefect's official documentation for any updated information or best practices.
By providing unique keys for each suspension attempt, you can ensure that each suspend action is treated as distinct, addressing the issue of redundant suspend calls.