Alexander

    Alexander

    1 year ago
    For some reason after migration from prefect server to prefect cloud all our tasks which raise signals.FAIL are marked as TimedOut after execution.
    22 July 2021,11:22:45 MSK	prefect.sometask	ERROR	Our job is failed
    22 July 2021,11:22:45 MSK	prefect.CloudTaskRunner	DEBUG	Task 'facebook_insights': Execution process closed, collecting result...
    22 July 2021,11:22:45 MSK	prefect.CloudTaskRunner	DEBUG	Task 'facebook_insights': Handling state change from Running to TimedOut
    The "Execution process closed, collecting result..." log is coming from
    prefect.utilities.executors.run_with_multiprocess_timeout
    . But i dont see any other logs from this functions (according to source code they exist there). There is indeed a timeout set for a task, like a 1 hour, but according to logs it finished in 20 minutes. How can i debug this? We are running latest prefect version with ECS agent
    Zach Angell

    Zach Angell

    1 year ago
    Hi @Alexander could you show an example of how you're raising signals.FAIL in one of your tasks?
    Alexander

    Alexander

    1 year ago
    def run(): ... raise signals.FAIL('somemessage')
    thats it
    Zach Angell

    Zach Angell

    1 year ago
    Can you set the logging level to
    DEBUG
    and re-run the flow? Debug logs should show what the Prefect Agent is using for a timeout The simplest way to do this is in the UI • Go to Flow -> Run -> Advanced Configuration • Change the logging level dropdown setting to DEBUG • Start the flow run
    Alexander

    Alexander

    1 year ago
    @Zach Angell Same story
    @Zach Angell i was able to boil to down to a case that when a task has timeout and raises fail signal, it gets converted to timedout state. If task raises arbitrary python exception, it works as expected
    @task(timeout=datetime.timedelta(seconds=60).seconds)
    def debug_task():
        raise signals.FAIL('Fail signal!!')
    
    
    @task(timeout=datetime.timedelta(seconds=60).seconds)
    def debug_task_exc():
        raise ValueError('Common error')
    Zach Angell

    Zach Angell

    1 year ago
    Hmmm that's interesting, I'd expect to see an indication that timeout will be respected in the debug logs you sent over
    "Task '{name}': Attaching process based timeout handler..."
    With a local agent and the same
    debug_task
    format you specified, I'm not able to reproduce the issue.
    from prefect import task, Flow
    from prefect.engine import signals
    import datetime
    @task(timeout=datetime.timedelta(seconds=60).seconds)
    def foo():
            raise signals.FAIL("Fail signal")
    
    with Flow("timeout signal fail testing") as flow:
            foo()
    
    
    flow.register("test")
    both your agent and the flow source code are using the latest Prefect version?
    Alexander

    Alexander

    1 year ago
    It does not reproduce for me either when i use local setup. It reproduces only on our production prefect cloud environment. Yes i just have updated both agent and task images to 0.15.3
    Zach Angell

    Zach Angell

    1 year ago
    Got it, I'll try to reproduce using an ECS agent today
    @Alexander I was never able to replicate on ECS, but I wasn't using Dask (which was foolish of me). If you're using a Dask Executor, I suspect this thread is exposing the same issue https://prefect-community.slack.com/archives/CL09KU1K7/p1630392939250300. I've opened up a PR and the team will be taking a look at the issue this week.