Hello. We are working with `prefect[aws]==0.15.3`...
# prefect-community
m
Hello. We are working with
prefect[aws]==0.15.3
. We have a flow that generates a large number of batch jobs, then submit them, splits them to chunks of 100 and generates an AWS waiter object per 100. We have one very large jobs with over 450 batch calls. This job seem to be stuck even though it progressed until the end. When tracking the progress we can see that a few tasks are stuck as mapped even though all child tasks are done. i.e. 472 successful definitions created in
create_ive_analysis
then all of them were submitted and chunked in
submit_jobs_analysis
and then all 5
AWSClientWait
jobs were done (we call it with map on the chunked job ids). The parent block is still in mapped mode. Also, sometimes an
AWSClientWait
tasks fails and the job doesn’t fail, it just stays there (again this is with a mapped
AWSClientWait
) Wait code
Copy code
wait_res = AWSClientWait(client='batch', waiter_name='JobComplete').map(waiter_kwargs=batched_waits)
Where batched_waits is created by
Copy code
@task
def submit_jobs_and_batch_job_awaits(jobs: List, batched_num=BATCH_CLIENT_WAIT_MAX_SIZE):
    submitted_jobs = [BatchSubmit().run(**job) for job in jobs]
    waits = []
    for i in range(0, len(submitted_jobs), batched_num):
        waits.append(
            {
                'jobs': submitted_jobs[i : i + batched_num],
                'WaiterConfig': {
                    'Delay': 10,
                    'MaxAttempts': 10000,
                },
            }
        )
    return
What could cause that?
k
Hi @Matan Drory, I think this is related to the size of the batch, but not sure. See this issue, it might be relevant.
Actually sorry this looks like it ran. What is your executor?
m
local dask
I see that the mapped jobs stay blue in succesfull finished jobs as well. I am more worried about the AWSwait failing and the job hanging
k
Can you try using processes instead of threads for the LocalDaskExecutor? They tend to be more stable
LocalDaskExecutor(scheduler="processes")
m
Yeah, we are
Copy code
with Flow(
    'ive',
    executor=LocalDaskExecutor(scheduler='processes'),
    state_handlers=[
        startup_handler,
        completion_handler,
        ive_flow_external_job_updater,
    ],
) as flow:
k
Let me ping the guy in the Github discussion and see if he has any ideas
t
That's me then @Kevin Kho! @Matan Drory we had similar trouble with AWSWait and ended up writing this task to use 100 tranches of waiters
Copy code
@task
def wait_batches(job_ids, delay, max_attempts):
    logger = prefect.context.get('logger')

    if len(job_ids) > 0:
        <http://logger.info|logger.info>(f"Waiting for job(s) to complete: {job_ids}")

        waiter = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        )

        aws_waiter_limit: int = 100  # aws imposed limit on batch.describe_job() in boto3
        tranched_job_ids = [job_ids[pos:pos + aws_waiter_limit] for pos in range(0, len(job_ids), aws_waiter_limit)]
        for tranch in tranched_job_ids:
            <http://logger.info|logger.info>(f"Tranch: {tranch[0]}, {tranch[-1]}, {len(tranch)}")
            waiter.run(
                waiter_kwargs={
                    'jobs': tranch,
                    'WaiterConfig': {
                        'Delay': delay,
                        'MaxAttempts': max_attempts
                    }
                },
            )

    return
🙏 1
m
so you are submitting everything in one go and then waiting one group at a time instead of mapping the awaits concurrently
t
Yes - exactly right!
We have to wait until the last task finishes anyway
m
Yeah, just so strange that it matters since the AWS wait tasks are just calls for boto3 describe batch job…
I understand the 100 limit (it’s build in to the describe call)
t
We tried a map of waiters like you but we experienced the same unreliability. Never got to the bottom of it.
👍 1
m
@Kyle McChesney
1
👀 1