<@ULVA73B9P> I have a flow that is pulling data fr...
# marvin-in-the-wild
p
@Marvin I have a flow that is pulling data from an email account using the imap_tools python library. This flow usually just takes a couple of minutes to run, but it will sometimes stall and end up running for hours or days before crashing. I'm trying to add a timeout to cancel the run if it goes for longer than 15 minutes. I have the following parameters for my flow: """ @flow(log_prints = True, timeout_seconds = 900) """ However, the flow can still run long after that 15 minutes has passed. What should I do differently?
m
Your approach seems correct, but there's some ongoing discussion and issues related to flow timeouts and their reliability. A relevant guide you can follow is the example from the Prefect documentation:
Copy code
python
from prefect import flow

@flow(timeout_seconds=900)
def my_flow():
    # Your flow logic here
    pass
However, since you are experiencing an issue where the timeout isn't working as expected, there might be a bug or exceptional condition causing this. There are also some related issues on GitHub indicating problems with timeouts: 1. Task hangs due to multiprocessing.Pool.join(): GitHub Issue 14901 2. Ensure infrastructure is always killed when job has crashed: GitHub Issue 13518 You may want to keep an eye on these issues or contribute further details if they sound relevant. For immediate mitigation, consider an alternative timeout mechanism within your flow logic using standard Python tools like
threading
or
asyncio
. Here's an example for handling timeout using `threading`:
Copy code
python
import threading
from prefect import flow, task

def run_with_timeout(func, args=(), kwargs=None, timeout=900):
    kwargs = kwargs or {}
    result = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))]

    def target():
        try:
            result[0] = func(*args, **kwargs)
        except Exception as e:
            result[0] = e

    thread = threading.Thread(target=target)
    thread.start()
    thread.join(timeout)

    if isinstance(result[0], BaseException):
        raise result[0]

    return result[0]

@task(log_prints = True)
def pull_data_from_email():
    # Your task logic using imap_tools
    pass

@flow(log_prints=True)
def my_flow():
    try:
        run_with_timeout(pull_data_from_email.run, timeout=900)
    except Exception as e:
        print(e)

if __name__ == "__main__":
    my_flow()
This approach could ensure that the long-running tasks within your flow are terminated if they exceed the specified timeout. For further checks on your Prefect version, configurations, or possibly contributing to bug reports, see here: - GitHub Issue 14901 on Task Hangs: GitHub Issue 14901 - GitHub Issue 13518 on Infrastructure Killing: GitHub Issue 13518 This should help you address the immediate issue. For the latest updates and further assistance, continually refer to the Prefect GitHub Issues and Documentation. Feel free to ask if you need additional help!