Is suspending a flow run supported by Prefect work...
# ask-community
f
Is suspending a flow run supported by Prefect workers (type=process)? I'm following the documentation at https://docs.prefect.io/v3/develop/pause-resume#suspend-a-flow-run I'm running Prefect Core version 3.1.4 on Kubernetes. One pod is running Prefect server, and another is running a Prefect worker of type process. I start a flow that calls a series of tasks that each consume CPU for a few seconds. After the first couple tasks have run, I use the Dashboard UI to suspend the flow run. The worker keeps executing the flow run, and starting new tasks even after the dashboard shows that the flow run status is Suspended. Eventually the flow run finishes and its status changes to Completed. Based on the documentation, I was expecting that when the flow code calls a task after the flow run's status is Suspended, the flow run would exit. Is this not supported when the infrastructure type is process? By the way, cancelling the flow run instead of suspended will cause the process to exit (as expected). See code in the thread.
Copy code
import math
import time
from prefect import flow, get_run_logger, task

def keep_cpu_busy(duration_seconds):
    """
    Keeps the CPU busy for the specified duration by performing calculations.

    Args:
        duration_seconds (float): The number of seconds to keep the CPU busy
    """
    start_time = time.time()

    while time.time() - start_time < duration_seconds:
        # Perform arbitrary calculations to consume CPU.
        for _ in range(10000):
            _ = math.cos(math.pi) * math.exp(math.sin(0.5))
            _ = math.sqrt(abs(math.tan(1.5)))

@task
def process_file(
    input_file_path: str,
    output_file_path: str,
    processing_time_seconds: int,
    ) -> str:
    print(f"Simulating processing of {input_file_path=} to {output_file_path=} for {processing_time_seconds} seconds.")
    keep_cpu_busy(processing_time_seconds)
    return output_file_path

@flow(
    name="suspendable"
)
def suspendable(
    num_files: int = 10,
    input_folder: str = "/mnt/input",
    output_folder: str = "/mnt/output",
    processing_time_seconds: int = 10,
    ) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{num_files=}, {input_folder=}, {output_folder=}, {processing_time_seconds=}")
    for i in range(num_files):
        file_number = i + 1
        input_file_path = f"{input_folder}/file_{file_number}.pdf"
        output_file_path = f"{output_folder}/file_{file_number}.txt"
        <http://logger.info|logger.info>(f"Submitting task for {file_number=}")
        result = process_file(input_file_path, output_file_path, processing_time_seconds)
        <http://logger.info|logger.info>(f"Result for {file_number=}: {result}")
Sample invocation:
Copy code
prefect deployment run 'suspendable/suspendable' -p num_files=20 -p processing_time_seconds=10
c
Hey Fernando - you cannot currently suspend runs from the dashboard or API; the docs you linked show an example of planning a suspension of a run (which requires putting a "suspend" type step in the flow definition itself a-priori).
j
the doc says:
You can suspend flow runs out-of-process by calling
suspend_flow_run(flow_run_id=<ID>)
or selecting the Suspend button in the Prefect UI or Prefect Cloud.
Do you mean this is only to mark the status to suspend but not actually tearing off the running flow?
@Chris White Could you explain more on how to
require putting a "suspend" type step in the flow definition itself a-priori
?
f
@Chris White thanks for answering. I thought that suspending a flow run via the dashboard was supported because I see this feature on the version of the dashboard that I have installed (Prefect 3.1.4).
c
Hm yea, that might be an oversight related to changes from 2 to 3; I'm at an offsite but will follow up later this week
f
Thank you!