Matan Drory
02/03/2022, 5:35 PMprefect[aws]==0.15.3
. We have a flow that generates a large number of batch jobs, then submit them, splits them to chunks of 100 and generates an AWS waiter object per 100.
We have one very large jobs with over 450 batch calls. This job seem to be stuck even though it progressed until the end.
When tracking the progress we can see that a few tasks are stuck as mapped even though all child tasks are done. i.e. 472 successful definitions created in create_ive_analysis
then all of them were submitted and chunked in submit_jobs_analysis
and then all 5 AWSClientWait
jobs were done (we call it with map on the chunked job ids). The parent block is still in mapped mode.
Also, sometimes an AWSClientWait
tasks fails and the job doesn’t fail, it just stays there (again this is with a mapped AWSClientWait
)
Wait code
wait_res = AWSClientWait(client='batch', waiter_name='JobComplete').map(waiter_kwargs=batched_waits)
Where batched_waits is created by
@task
def submit_jobs_and_batch_job_awaits(jobs: List, batched_num=BATCH_CLIENT_WAIT_MAX_SIZE):
submitted_jobs = [BatchSubmit().run(**job) for job in jobs]
waits = []
for i in range(0, len(submitted_jobs), batched_num):
waits.append(
{
'jobs': submitted_jobs[i : i + batched_num],
'WaiterConfig': {
'Delay': 10,
'MaxAttempts': 10000,
},
}
)
return
What could cause that?Kevin Kho
Matan Drory
02/03/2022, 6:43 PMKevin Kho
LocalDaskExecutor(scheduler="processes")
Matan Drory
02/03/2022, 6:50 PMwith Flow(
'ive',
executor=LocalDaskExecutor(scheduler='processes'),
state_handlers=[
startup_handler,
completion_handler,
ive_flow_external_job_updater,
],
) as flow:
Kevin Kho
Theo Platt
02/03/2022, 7:00 PM@task
def wait_batches(job_ids, delay, max_attempts):
logger = prefect.context.get('logger')
if len(job_ids) > 0:
<http://logger.info|logger.info>(f"Waiting for job(s) to complete: {job_ids}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
aws_waiter_limit: int = 100 # aws imposed limit on batch.describe_job() in boto3
tranched_job_ids = [job_ids[pos:pos + aws_waiter_limit] for pos in range(0, len(job_ids), aws_waiter_limit)]
for tranch in tranched_job_ids:
<http://logger.info|logger.info>(f"Tranch: {tranch[0]}, {tranch[-1]}, {len(tranch)}")
waiter.run(
waiter_kwargs={
'jobs': tranch,
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
return
Matan Drory
02/03/2022, 7:24 PMTheo Platt
02/03/2022, 7:32 PMMatan Drory
02/03/2022, 7:33 PMTheo Platt
02/03/2022, 7:34 PMMatan Drory
03/09/2022, 6:30 PM