Theo Platt
11/10/2021, 7:46 PMAWSClientWait
to monitor the status of each of those jobs. Here's the code for that mapped task -
@task
def wait_batch(job_id, delay, max_attempts):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
<http://logger.info|logger.info>(f"Job complete: {job_id}")
return job_id
But what we are sometimes seeing are one or more batch jobs failing, which then somehow stops the other jobs from responding to this AWSClientWait call... and so the mapped task keeps running even though all the jobs have either failed or completed. Any ideas?Kevin Kho
Theo Platt
11/10/2021, 7:49 PMKevin Kho
Theo Platt
11/10/2021, 8:16 PM@task(name="My Batch call")
def run_batch(data):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f'{data=}')
batchjob = BatchSubmit(job_name='myjob_batch', job_definition='myjob_batch', job_queue='prefect_efs_queue_spot')
job_id = batchjob.run(batch_kwargs={
'containerOverrides': {
'environment': [
{
'name': 'data',
'value': data
}
]
}
})
return job_id
Kevin Kho
Theo Platt
11/10/2021, 8:21 PM#
# Kick off in parallel all the jobs in parallel
#
batch_job_ids = run_batch.map(
data=somedata
)
#
# Poll for the completion of all jobs
#
exit_codes = wait_batch.map(
batch_job_ids,
delay=unmapped(5),
max_attempts=unmapped(1000)
)
exit_codes.set_upstream(unmapped(batch_job_ids))
Kevin Kho
Theo Platt
11/10/2021, 8:26 PMexecutor=LocalDaskExecutor(scheduler="processes")
Kevin Kho
Theo Platt
11/10/2021, 8:32 PMKevin Kho
AWSClientWait
on a job id you have that is done?Theo Platt
11/10/2021, 8:35 PMwaiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': list_of_job_ids,
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
Kevin Kho
Theo Platt
12/06/2021, 7:35 PM@task
def wait_batches(job_ids, delay, max_attempts):
logger = prefect.context.get('logger')
if len(job_ids) > 0:
<http://logger.info|logger.info>(f"Waiting for job(s) to complete: {job_ids}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
aws_waiter_limit: int = 100 #aws imposed limit on batch.describe_job() in boto3
tranched_job_ids = [job_ids[pos:pos + aws_waiter_limit] for pos in range(0, len(job_ids), aws_waiter_limit)]
for tranch in tranched_job_ids:
<http://logger.info|logger.info>(f"Tranch: {tranch[0]}, {tranch[-1]}, {len(tranch)}")
waiter.run(
waiter_kwargs={
'jobs': tranch,
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
return
Kevin Kho
Marvin
01/31/2022, 5:54 PM