Fernando Correia
12/16/2024, 10:51 PMFernando Correia
12/16/2024, 10:52 PMimport math
import time
from prefect import flow, get_run_logger, task
def keep_cpu_busy(duration_seconds):
"""
Keeps the CPU busy for the specified duration by performing calculations.
Args:
duration_seconds (float): The number of seconds to keep the CPU busy
"""
start_time = time.time()
while time.time() - start_time < duration_seconds:
# Perform arbitrary calculations to consume CPU.
for _ in range(10000):
_ = math.cos(math.pi) * math.exp(math.sin(0.5))
_ = math.sqrt(abs(math.tan(1.5)))
@task
def process_file(
input_file_path: str,
output_file_path: str,
processing_time_seconds: int,
) -> str:
print(f"Simulating processing of {input_file_path=} to {output_file_path=} for {processing_time_seconds} seconds.")
keep_cpu_busy(processing_time_seconds)
return output_file_path
@flow(
name="suspendable"
)
def suspendable(
num_files: int = 10,
input_folder: str = "/mnt/input",
output_folder: str = "/mnt/output",
processing_time_seconds: int = 10,
) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"{num_files=}, {input_folder=}, {output_folder=}, {processing_time_seconds=}")
for i in range(num_files):
file_number = i + 1
input_file_path = f"{input_folder}/file_{file_number}.pdf"
output_file_path = f"{output_folder}/file_{file_number}.txt"
<http://logger.info|logger.info>(f"Submitting task for {file_number=}")
result = process_file(input_file_path, output_file_path, processing_time_seconds)
<http://logger.info|logger.info>(f"Result for {file_number=}: {result}")
Fernando Correia
12/16/2024, 10:52 PMprefect deployment run 'suspendable/suspendable' -p num_files=20 -p processing_time_seconds=10
Chris White
Ji-Oh Yoo
12/18/2024, 8:45 PMYou can suspend flow runs out-of-process by callingDo you mean this is only to mark the status to suspend but not actually tearing off the running flow?or selecting the Suspend button in the Prefect UI or Prefect Cloud.suspend_flow_run(flow_run_id=<ID>)
Ji-Oh Yoo
12/18/2024, 8:46 PMrequire putting a "suspend" type step in the flow definition itself a-priori
?Fernando Correia
12/18/2024, 9:27 PMChris White
Fernando Correia
12/19/2024, 12:39 AM