Andreas Geissner
10/01/2024, 8:17 AMAndreas Geissner
10/01/2024, 9:23 AMimport time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future1 = wait_task.submit(5)
try:
status1 = future1.result(10)
<http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
finally:
future1 = None # Explicitly nullify to avoid premature garbage collection
future2 = wait_task.submit(15)
try:
status2 = future2.result(10)
<http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')
finally:
future2 = None # Explicitly nullify to avoid premature garbage collection
minimal_wait()
Nate
10/01/2024, 2:52 PMfuture2
, you stop waiting for it after 10 seconds .result(10)
, which means its still running in the background. then by setting it to None, you're removing the reference to it, actually allowing garbage collection
if you replace future2 = None
with future2.wait()
or future.result()
, then you wouldnt get that warning
for example, this does not throw the warning
import time
from prefect import flow, get_run_logger, task
@task
def wait_task(t):
time.sleep(t)
return_value = f"Waited for {t} seconds"
get_run_logger().info(f"Now returning the following sentence: {return_value}")
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future2 = wait_task.submit(5)
try:
status2 = future2.result(2)
logger.info(f"Status when waiting 5s with time limit 2s: {status2}")
except TimeoutError:
logger.info("Timeout when waiting 5s with time limit 2s")
finally:
future2.wait()
minimal_wait()
Andreas Geissner
10/01/2024, 3:03 PM