Andreas Geissner
10/01/2024, 7:17 AMimport time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
status = wait_task.submit(5).wait(10)
<http://logger.info|logger.info>(f'Status when waiting 5 s with time limit 10 s: {status}')
status = wait_task.submit(15).wait(10)
<http://logger.info|logger.info>(f'Status when waiting 15 s with time limit 10 s: {status}')
minimal_wait()
I also get the following Warning:
A future was garbage collected before it resolved. Please call .wait()
or .result()
on futures to ensure they resolve.
See https://docs.prefect.io/latest/develop/task-runners for more details.Marvin
10/01/2024, 7:18 AM.wait()
method works with task futures. It appears that calling .wait()
directly on a task future in Prefect doesn't always return the task's result as expected, especially if the future gets garbage collected before resolution.
Based on the information and issues found, here’s a modified version of your example that ensures the task results are appropriately awaited and logged:
python
import time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future1 = wait_task.submit(5)
status1 = future1.result(10)
<http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
future2 = wait_task.submit(15)
status2 = future2.result(10)
<http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
minimal_wait()
### Key Changes:
1. Using result()
: Instead of .wait()
, I used .result()
, which blocks until the task completes or the timeout is hit.
2. Storing futures: Store task futures in variables to prevent them from being garbage collected before resolution.
References:
- Prefect 2.x task runners documentation: Prefect Task Runners
- GitHub Issues related to Prefect Futures and multiprocessing:
- Pickle Hangs Due to Apparent Circular Import
- Multiprocessing in Prefect tasks causes deadlock
- UnfinishedRun when futures are passed to another taskAndreas Geissner
10/01/2024, 7:35 AMimport time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future1 = wait_task.submit(5)
status1 = future1.result(10)
<http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
try:
future2 = wait_task.submit(15)
status2 = future2.result(10)
<http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout as expected')
minimal_wait()
Marvin
10/01/2024, 7:35 AM<http://logger.info|logger.info>
. It should use <http://logger.info|logger.info>()
directly.
Here’s the corrected version of your code:
python
import time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future1 = wait_task.submit(5)
try:
status1 = future1.result(10)
<http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
future2 = wait_task.submit(15)
try:
status2 = future2.result(10)
<http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')
minimal_wait()
### Key Points:
1. Hold Reference: Ensure future objects (future1
and future2
) are stored in variables.
2. Proper Logging: Use <http://logger.info|logger.info>()
directly instead of the incorrect <http://logger.info|logger.info>
format.
3. Timeout Handling: Added try-except
block for handling TimeoutError
.
This should help ensure that the future objects are retained until they resolve or a timeout occurs, avoiding garbage collection issues.
If the issue persists, it might be related to specific nuances in Prefect's handling of futures or the environment in which you are running the code. Ensure that you are running the latest version of Prefect compatible with your setup. If necessary, you can also consult more detailed future handling in Prefect’s documentation or related GitHub issues.Andreas Geissner
10/01/2024, 7:44 AMAndreas Geissner
10/01/2024, 7:44 AMMarvin
10/01/2024, 7:44 AMNone
after usage to ensure they aren't prematurely garbage collected.
Here's another attempt with this strategy:
python
import time
from prefect import flow, task, get_run_logger
@task
def wait_task(t):
time.sleep(t)
return_value = f'Waited for {t} seconds'
get_run_logger().info(f'Now returning the following sentence: {return_value}')
return return_value
@flow
def minimal_wait():
logger = get_run_logger()
future1 = wait_task.submit(5)
try:
status1 = future1.result(10)
<http://logger.info|logger.info>(f'Status when waiting 5s with time limit 10s: {status1}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 5s with time limit 10s')
finally:
future1 = None # Explicitly nullify to avoid premature garbage collection
future2 = wait_task.submit(15)
try:
status2 = future2.result(10)
<http://logger.info|logger.info>(f'Status when waiting 15s with time limit 10s: {status2}')
except TimeoutError:
<http://logger.info|logger.info>('Timeout when waiting 15s with time limit 10s')
finally:
future2 = None # Explicitly nullify to avoid premature garbage collection
minimal_wait()
### Key Points:
1. Set Futures to None
After Usage: Explicitly set the futures to None
after they've been used to ensure they aren't prematurely garbage collected.
This approach ensures that the references are explicitly broken only after their usage, reducing the chances of garbage collection occurring prematurely.
However, if the problem persists, there might be an underlying issue within Prefect itself. The tracking of future states might be deep within Prefect’s task runner implementation, or there could be issues with how long-running tasks and futures interact. In that case, it could be valuable to check if there are any existing bugs or fixes related to this in the Prefect GitHub repository or to upgrade to the latest version if you haven't already.