Hi everyone, some of my code is not working anymo...
# ask-community
a
Hi everyone, some of my code is not working anymore because the submit.wait() always yields None no matter what the task returns. The amazing Marvin helped me to develop some minimal example to use result() instead of wait(). But I still get the warning that the Future was garbage collected before it resolved. May I have hit a bug or is there something wrong with my code? Cheers, Andreas
Copy code
import time
from prefect import flow, task, get_run_logger


@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value


@flow
def minimal_wait():
    logger = get_run_logger()

    future1 = wait_task.submit(5)
    try:
        status1 = future1.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
    finally:
        future1 = None  # Explicitly nullify to avoid premature garbage collection

    future2 = wait_task.submit(15)
    try:
        status2 = future2.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')
    finally:
        future2 = None  # Explicitly nullify to avoid premature garbage collection


minimal_wait()
n
hi @Andreas Geissner I think what's happening is that in the case of
future2
, you stop waiting for it after 10 seconds
.result(10)
, which means its still running in the background. then by setting it to None, you're removing the reference to it, actually allowing garbage collection if you replace
future2 = None
with
future2.wait()
or
future.result()
, then you wouldnt get that warning for example, this does not throw the warning
Copy code
import time

from prefect import flow, get_run_logger, task


@task
def wait_task(t):
    time.sleep(t)
    return_value = f"Waited for {t} seconds"
    get_run_logger().info(f"Now returning the following sentence: {return_value}")
    return return_value


@flow
def minimal_wait():
    logger = get_run_logger()

    future2 = wait_task.submit(5)
    try:
        status2 = future2.result(2)
        logger.info(f"Status when waiting 5s with time limit 2s: {status2}")
    except TimeoutError:
        logger.info("Timeout when waiting 5s with time limit 2s")
    finally:
        future2.wait()


minimal_wait()
a
Thanks a lot! Setting to None was Marvin's final attempt to help solve that. But your code worked for me as well.