<@ULVA73B9P> I'm still struggling with the return ...
# ask-marvin
a
@Marvin I'm still struggling with the return value of the wait() method of tasks. I always get None. Can you tell me what is wrong with the following minimal example:
Copy code
import time
from prefect import flow, task, get_run_logger

@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value

@flow
def minimal_wait():
    logger = get_run_logger()
    status = wait_task.submit(5).wait(10)
    <http://logger.info|logger.info>(f'Status when waiting 5 s with time limit 10 s: {status}')
    status = wait_task.submit(15).wait(10)
    <http://logger.info|logger.info>(f'Status when waiting 15 s with time limit 10 s: {status}')


minimal_wait()
I also get the following Warning: A future was garbage collected before it resolved. Please call
.wait()
or
.result()
on futures to ensure they resolve. See https://docs.prefect.io/latest/develop/task-runners for more details.
m
The issue you're encountering is due to how Prefect's
.wait()
method works with task futures. It appears that calling
.wait()
directly on a task future in Prefect doesn't always return the task's result as expected, especially if the future gets garbage collected before resolution. Based on the information and issues found, here’s a modified version of your example that ensures the task results are appropriately awaited and logged:
Copy code
python
import time
from prefect import flow, task, get_run_logger

@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value

@flow
def minimal_wait():
    logger = get_run_logger()
    future1 = wait_task.submit(5)
    status1 = future1.result(10)
    <http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')

    future2 = wait_task.submit(15)
    status2 = future2.result(10)
    <http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')

minimal_wait()
### Key Changes: 1. Using
result()
: Instead of
.wait()
, I used
.result()
, which blocks until the task completes or the timeout is hit. 2. Storing futures: Store task futures in variables to prevent them from being garbage collected before resolution. References: - Prefect 2.x task runners documentation: Prefect Task Runners - GitHub Issues related to Prefect Futures and multiprocessing: - Pickle Hangs Due to Apparent Circular Import - Multiprocessing in Prefect tasks causes deadlock - UnfinishedRun when futures are passed to another task
a
@Marvin I now changed your Code to the following to handle the TimeoutError that arises from using result(). However, I'm again getting the error that the future was garbage collected before it resolved. Do you have any advise?
Copy code
import time
from prefect import flow, task, get_run_logger

@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value

@flow
def minimal_wait():
    logger = get_run_logger()
    future1 = wait_task.submit(5)
    status1 = future1.result(10)
    <http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')

    try:
        future2 = wait_task.submit(15)
        status2 = future2.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout as expected')

minimal_wait()
m
The error message indicating that the future was garbage collected before it resolved suggests that the future object might be getting de-referenced too soon. To avoid this, you should ensure that the future is properly held in a variable and waited upon. Moreover, I noticed that the logging statements in your updated code use an incorrect format
<http://logger.info|logger.info>
. It should use
<http://logger.info|logger.info>()
directly. Here’s the corrected version of your code:
Copy code
python
import time
from prefect import flow, task, get_run_logger

@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value

@flow
def minimal_wait():
    logger = get_run_logger()
    
    future1 = wait_task.submit(5)
    try:
        status1 = future1.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
    
    future2 = wait_task.submit(15)
    try:
        status2 = future2.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')

minimal_wait()
### Key Points: 1. Hold Reference: Ensure future objects (
future1
and
future2
) are stored in variables. 2. Proper Logging: Use
<http://logger.info|logger.info>()
directly instead of the incorrect
<http://logger.info|logger.info>
format. 3. Timeout Handling: Added
try-except
block for handling
TimeoutError
. This should help ensure that the future objects are retained until they resolve or a timeout occurs, avoiding garbage collection issues. If the issue persists, it might be related to specific nuances in Prefect's handling of futures or the environment in which you are running the code. Ensure that you are running the latest version of Prefect compatible with your setup. If necessary, you can also consult more detailed future handling in Prefect’s documentation or related GitHub issues.
a
@Marvin Thank you for the new code. However, I still get the warning that the future was garbage collected before it resolved with you code
@Marvin Thank you for the new code. However, I still get the warning that the future was garbage collected before it resolved with you code
m
It looks like there might be some environment-related issues or internal behavior in Prefect causing this warning to appear despite the correct retention of futures in variables. Another approach to mitigate this issue could be to explicitly set the futures to
None
after usage to ensure they aren't prematurely garbage collected. Here's another attempt with this strategy:
Copy code
python
import time
from prefect import flow, task, get_run_logger

@task
def wait_task(t):
    time.sleep(t)
    return_value = f'Waited for {t} seconds'
    get_run_logger().info(f'Now returning the following sentence: {return_value}')
    return return_value

@flow
def minimal_wait():
    logger = get_run_logger()
    
    future1 = wait_task.submit(5)
    try:
        status1 = future1.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
    finally:
        future1 = None  # Explicitly nullify to avoid premature garbage collection
    
    future2 = wait_task.submit(15)
    try:
        status2 = future2.result(10)
        <http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
    except TimeoutError:
        <http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')
    finally:
        future2 = None  # Explicitly nullify to avoid premature garbage collection

minimal_wait()
### Key Points: 1. Set Futures to
None
After Usage
: Explicitly set the futures to
None
after they've been used to ensure they aren't prematurely garbage collected. This approach ensures that the references are explicitly broken only after their usage, reducing the chances of garbage collection occurring prematurely. However, if the problem persists, there might be an underlying issue within Prefect itself. The tracking of future states might be deep within Prefect’s task runner implementation, or there could be issues with how long-running tasks and futures interact. In that case, it could be valuable to check if there are any existing bugs or fixes related to this in the Prefect GitHub repository or to upgrade to the latest version if you haven't already.