Hi there. I think this is a simple question! I hav...
# ask-community
t
Hi there. I think this is a simple question! I have some flows (generally many hours long) where the last task fails but the flow continues to run. The last task happens to be an AWSClientWait on a Batch job but I don't know if this is the reason or not. Or alternatively is there a way to fail a flow if any of the tasks fail? Thanks as always!
k
Hey @Theo Platt a Flow should fail automatically if a terminal task fails. You can set this when you construct your flow with
flow.terminal_tasks
.
Copy code
with Flow(...) as flow:
   a = task_one()
   b = task_two()

flow.terminal_tasks = [a,b]
With AWSClientWait, I’ve seen issues with timeouts. There might be a default timeout there causing failures on the boto3 side (I’ve seen 12 hours). I have also need issues when mapping a lot of these calls (over 100). Are you doing that?
Oh that previous issue was also you lol
I was wondering why I see so many AWS Batch questions 😆
t
Yes - lot of those are me! 😉 Trying to pay it back with the solutions I find.
In this case it was the very last, terminal task, but I still had to manually cancel the run in the prefect cloud as it was still 'blue'
and here's the code for that final task. In this case it did actually time out after delay*max_attempts had passed. So the task failed but the flow thought all was good still.
Copy code
@task
def wait_batch(job_id, delay, max_attempts):

    logger = prefect.context.get('logger')

    <http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")

    waiter = AWSClientWait(
        client='batch',
        waiter_name='JobComplete',
    )
    waiter.run(
        waiter_kwargs={
            'jobs': [job_id],
            'WaiterConfig': {
                'Delay': delay,
                'MaxAttempts': max_attempts
            }
        },   
    )

    <http://logger.info|logger.info>(f"Job complete: {job_id}")

    return job_id
k
Are you on ECS by chance?
t
yes
both the agent and the flow execution
k
I think this is a similar issue. We opened an internal ticket for it. Do you get a heartbeat failure in the logs? Does your ECS container show it exited?
t
The flow logs look like this so the flow is still running -
k
Ok thanks. Will add this to our internal issue. Do you get any message like this though on the ECS side?
t
Nope - the actual Batch job kept on running and eventually finished successfully about an hour after the timeout we'd imposed on the AWSClientWait
k
But the batch is different compute than the ECS container right? So ECS shows everything went as normal?
t
Yes - that's right
Everything seems to be working great except for the flow not failing when that last task fails
k
Ok it’s honestly hard for me to tell if this is expected or not. A failed task does not necessarily update the state of the Flow, so what it looks like is that the ECS compute is still going even if the task is marked as failed and the FlowRunner has not exited yet. Part of this probably has to do with the fact that timeouts are a best effort thing and not guaranteed. It’s very hard to terminate an ongoing Python program, and it gets all the more harder when it’s happening on different machines (not the case here). It could be that the timeout here is just failing to terminate the task and the Flow just keeps going on.
Either way, I think the thing for me to do here is add your issue to our internal issue for the previously mentioned thread. It may be related.
t
Thanks as always @Kevin Kho! I'll keep looking my side too. It does look like the AWSClientWait fails ok and raises a FAIL. https://github.com/PrefectHQ/prefect/blob/d44b72a950ebda9f7bc6a9712fc71e2e9c680d25/src/prefect/tasks/aws/client_waiter.py#L118-L121
Maybe as I have this task wrapped in my own task should I be capturing the exception?
Copy code
@task
def wait_batch(job_id, delay, max_attempts):

    logger = prefect.context.get('logger')

    <http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")

    waiter = AWSClientWait(
        client='batch',
        waiter_name='JobComplete',
    )
    waiter.run(
        waiter_kwargs={
            'jobs': [job_id],
            'WaiterConfig': {
                'Delay': delay,
                'MaxAttempts': max_attempts
            }
        },   
    )

    <http://logger.info|logger.info>(f"Job complete: {job_id}")

    return job_id
k
Oh I see. Yes I suppose capturing that may give you more inofrmative logs.