<@ULVA73B9P> after resuming a suspended flow run, ...
# ask-marvin
w
@Marvin after resuming a suspended flow run, if I suspend that flow run, the flow run finishes in state completed, instead of the expected state suspended. Why is this?
m
The issue you're describing seems to be a known bug related to the way Prefect handles resumption after a flow run has been suspended. According to Issue #9269 on Prefect's GitHub repository, pausing a flow run from the Prefect Server UI exits the flow entirely instead of correctly pausing it. This does not happen when the
pause_flow_run
function is invoked directly within the code. ### Steps to Reproduce (Referenced in Issue) 1. Setup a Minimal Reproducible Example:
Copy code
python
    from prefect import flow, task, get_run_logger
    import time

    @task
    def append_1(num: str):
        time.sleep(6)
        return num + "1"

    @task
    def append_2(num: str):
        time.sleep(6)
        return num + "2"

    @task
    def append_num(num: str, append_with: str):
        time.sleep(6)
        return num + append_with

    @task
    def convert_str_to_int(num: str):
        time.sleep(6)
        return int(num)

    from prefect.engine import pause_flow_run

    @flow()
    def minimal_reproducible_example(num: str):
        x = append_1.submit(num)
        y = append_2.submit(x.result())
        z = append_num.submit(y.result(), "12")
        logger = get_run_logger()
        <http://logger.info|logger.info>(f"First result: {convert_str_to_int(x.result())}")
        <http://logger.info|logger.info>(f"Second result: {convert_str_to_int(y.result())}")
        <http://logger.info|logger.info>(f"Third result: {convert_str_to_int(z.result())}")
2. Pause Using UI vs Function: - Pausing via the UI can lead to the flow terminating instead of pausing. - Directly using
pause_flow_run(timeout=30)
within the flow code does not exhibit this issue. ### Workaround - Invoke
pause_flow_run
directly within your flow code rather than using the UI to avoid this issue. ### Related Issues - Issue #8360: Discusses ensuring tasks can rerun correctly upon resuming a flow when results are not persisted, which may also be relevant to your use case. ### Further Reading - Prefect Docs on Flow for more on flow management and the
pause_flow_run
function. You might need to adjust your workflow and handle pausing programmatically until this bug is resolved in the UI. If you need real-time updates, keep an eye on the referenced GitHub issues or subscribe to notifications for their progress.
w
@Marvin This is to do with suspend_flow_run. Let’s say a flow run is suspended. I resume it, but in the code of the flow run, it checks a condition and suspends itself with suspend_flow_run. I would expect the state at the end to be suspended. Instead, it is Completed. The logs indicate that prefect tried to suspend the flow run again. This seems like a bug to me.
m
It appears that the behavior you're describing—where a resumed flow run that suspends itself again ends up in a
Completed
state instead of a
Suspended
state—might be a bug. Based on the search results and available resources, here are some points and a potential workaround: ### Points to Note: 1. Flow Suspension and Resumption Mechanics: - A flow run can be suspended by calling the
suspend_flow_run
function or using the Prefect UI (source). - Resuming can be done via the UI or programmatically using
resume_flow_run(FLOW_RUN_ID)
. 2. Known Issues: - There are known issues where resuming a flow might lead to unexpected states, as mentioned in various GitHub issues. Unfortunately, a direct issue matching your exact criteria wasn't found, but Issue #9269 could be relevant as it discusses incorrect behavior on suspending flows through the UI. ### Workaround Approach: While waiting for a potential fix, you can implement a workaround in your code to handle the state checking more explicitly. Here is a hypothetical example of how you might code around this:
Copy code
python
from prefect import flow, task, get_client
from prefect.states import Suspended

@task
def check_condition():
    # Implement your condition-checking logic here
    # Return True if condition met, False otherwise
    return True

async def suspend_flow_run_if_needed(flow_run_id):
    # Check condition
    condition_met = check_condition()
    
    if condition_met:
        # Suspend the flow run
        client = get_client()
        await client.update_flow_run(flow_run_id, state=Suspended())
        return True
    return False

@flow
async def main_flow():
    flow_run_id = await get_client().get_run_context().flow_run.id
    
    # Resume the flow run
    if not await suspend_flow_run_if_needed(flow_run_id):
        # Continue executing your flow logic if not suspended
        ...

if __name__ == "__main__":
    main_flow()
### Example Code: Here’s a link to an additional code example utilizing Prefect flow for reference: - Basic Flow Example ### Following Up: 1. GitHub Issues: - Monitor Prefect’s GitHub repository for new issues or updates related to flow suspension and state handling. - Consider reporting this behavior if it persists, including your specific use case and code snippets to better illustrate the issue. 2. Documentation: - Refer to Prefect's official documentation on flows for the latest updates. This workaround programmatically checks and sets the state to
Suspended
, which may bypass the issue of incorrect state transitions.
w
@Marvin I suspect the issue may be related to the key parameter in the suspend_flow_run function. The description is
Copy code
An optional key to prevent calling suspend more than once. This
    defaults to a random string and prevents suspends from running the
    same suspend twice. A custom key can be supplied for custom
    suspending behavior.
And it is used in this manner:
Copy code
pause_counter = _observed_flow_pauses(context)
pause_key = key or str(pause_counter)
proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)
I suspect the key might be the same in the second suspension which is why it isn't working. Is that correct?
m
Your suspicion about the
key
parameter potentially causing the issue is plausible. The
key
parameter in the
suspend_flow_run
function is indeed used to prevent the same suspend from being called multiple times. In the Prefect documentation, the key works like this:
Copy code
python
pause_counter = _observed_flow_pauses(context)
pause_key = key or str(pause_counter)
proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)
By default, the
pause_key
is set to a string representation of the
pause_counter
, which increments each time a pause attempt is observed. If the
key
parameter isn't specified uniquely for each suspend attempt, the same key will be reused, causing the suspend action to potentially be ignored if it matches a previously used key. ### Recommendation: To ensure that each suspension within a flow run generates a unique key, you can provide a custom key or make sure the automatic keys generated are distinct. ### Example: Here's an example illustrating how to handle custom keys:
Copy code
python
from prefect import flow, task
from prefect.client import get_client
import uuid

@task
def check_condition():
    # Implement condition-checking logic here
    # Return True if condition met, False otherwise
    return True

async def suspend_flow_run_if_needed(flow_run_id, custom_key=None):
    condition_met = check_condition()

    if condition_met:
        suspend_key = custom_key or str(uuid.uuid4())  # Generate a unique key if not provided
        client = get_client()
        await client.update_flow_run(flow_run_id, state=Suspended(pause_key=suspend_key))
        return True
    return False

@flow
async def main_flow():
    flow_run_id = await get_client().get_run_context().flow_run.id

    # First suspension attempt
    if not await suspend_flow_run_if_needed(flow_run_id, custom_key="first_suspend"):
        ...

    # After resumption, check and suspend again if needed
    if not await suspend_flow_run_if_needed(flow_run_id, custom_key="second_suspend"):
        ...

if __name__ == "__main__":
    main_flow()
### Following Up: - Monitor GitHub Issues: Keep an eye on Prefect's GitHub issues for related bug reports and updates. - Documentation: Continue to refer to Prefect's official documentation for any updated information or best practices. By providing unique keys for each suspension attempt, you can ensure that each suspend action is treated as distinct, addressing the issue of redundant suspend calls.