t

    Theo Platt

    9 months ago
    Hi there. I think this is a simple question! I have some flows (generally many hours long) where the last task fails but the flow continues to run. The last task happens to be an AWSClientWait on a Batch job but I don't know if this is the reason or not. Or alternatively is there a way to fail a flow if any of the tasks fail? Thanks as always!
    Kevin Kho

    Kevin Kho

    9 months ago
    Hey @Theo Platt a Flow should fail automatically if a terminal task fails. You can set this when you construct your flow with
    flow.terminal_tasks
    .
    with Flow(...) as flow:
       a = task_one()
       b = task_two()
    
    flow.terminal_tasks = [a,b]
    With AWSClientWait, I’ve seen issues with timeouts. There might be a default timeout there causing failures on the boto3 side (I’ve seen 12 hours). I have also need issues when mapping a lot of these calls (over 100). Are you doing that?
    Oh that previous issue was also you lol
    I was wondering why I see so many AWS Batch questions 😆
    t

    Theo Platt

    9 months ago
    Yes - lot of those are me! 😉 Trying to pay it back with the solutions I find.
    In this case it was the very last, terminal task, but I still had to manually cancel the run in the prefect cloud as it was still 'blue'
    and here's the code for that final task. In this case it did actually time out after delay*max_attempts had passed. So the task failed but the flow thought all was good still.
    @task
    def wait_batch(job_id, delay, max_attempts):
    
        logger = prefect.context.get('logger')
    
        <http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
    
        waiter = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        )
        waiter.run(
            waiter_kwargs={
                'jobs': [job_id],
                'WaiterConfig': {
                    'Delay': delay,
                    'MaxAttempts': max_attempts
                }
            },   
        )
    
        <http://logger.info|logger.info>(f"Job complete: {job_id}")
    
        return job_id
    Kevin Kho

    Kevin Kho

    9 months ago
    Are you on ECS by chance?
    t

    Theo Platt

    9 months ago
    yes
    both the agent and the flow execution
    Kevin Kho

    Kevin Kho

    9 months ago
    I think this is a similar issue. We opened an internal ticket for it. Do you get a heartbeat failure in the logs? Does your ECS container show it exited?
    t

    Theo Platt

    9 months ago
    The flow logs look like this so the flow is still running -
    Kevin Kho

    Kevin Kho

    9 months ago
    Ok thanks. Will add this to our internal issue. Do you get any message like this though on the ECS side?
    t

    Theo Platt

    9 months ago
    Nope - the actual Batch job kept on running and eventually finished successfully about an hour after the timeout we'd imposed on the AWSClientWait
    Kevin Kho

    Kevin Kho

    9 months ago
    But the batch is different compute than the ECS container right? So ECS shows everything went as normal?
    t

    Theo Platt

    9 months ago
    Yes - that's right
    Everything seems to be working great except for the flow not failing when that last task fails
    Kevin Kho

    Kevin Kho

    9 months ago
    Ok it’s honestly hard for me to tell if this is expected or not. A failed task does not necessarily update the state of the Flow, so what it looks like is that the ECS compute is still going even if the task is marked as failed and the FlowRunner has not exited yet. Part of this probably has to do with the fact that timeouts are a best effort thing and not guaranteed. It’s very hard to terminate an ongoing Python program, and it gets all the more harder when it’s happening on different machines (not the case here). It could be that the timeout here is just failing to terminate the task and the Flow just keeps going on.
    Either way, I think the thing for me to do here is add your issue to our internal issue for the previously mentioned thread. It may be related.
    t

    Theo Platt

    9 months ago
    Thanks as always @Kevin Kho! I'll keep looking my side too. It does look like the AWSClientWait fails ok and raises a FAIL. https://github.com/PrefectHQ/prefect/blob/d44b72a950ebda9f7bc6a9712fc71e2e9c680d25/src/prefect/tasks/aws/client_waiter.py#L118-L121
    Maybe as I have this task wrapped in my own task should I be capturing the exception?
    @task
    def wait_batch(job_id, delay, max_attempts):
    
        logger = prefect.context.get('logger')
    
        <http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
    
        waiter = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        )
        waiter.run(
            waiter_kwargs={
                'jobs': [job_id],
                'WaiterConfig': {
                    'Delay': delay,
                    'MaxAttempts': max_attempts
                }
            },   
        )
    
        <http://logger.info|logger.info>(f"Job complete: {job_id}")
    
        return job_id
    Kevin Kho

    Kevin Kho

    9 months ago
    Oh I see. Yes I suppose capturing that may give you more inofrmative logs.