Theo Platt
12/06/2021, 7:25 PMKevin Kho
flow.terminal_tasks
.
with Flow(...) as flow:
a = task_one()
b = task_two()
flow.terminal_tasks = [a,b]
With AWSClientWait, I’ve seen issues with timeouts. There might be a default timeout there causing failures on the boto3 side (I’ve seen 12 hours). I have also need issues when mapping a lot of these calls (over 100). Are you doing that?Kevin Kho
Kevin Kho
Theo Platt
12/06/2021, 7:36 PMTheo Platt
12/06/2021, 7:39 PMTheo Platt
12/06/2021, 7:40 PMTheo Platt
12/06/2021, 7:43 PM@task
def wait_batch(job_id, delay, max_attempts):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
<http://logger.info|logger.info>(f"Job complete: {job_id}")
return job_id
Kevin Kho
Theo Platt
12/06/2021, 7:54 PMTheo Platt
12/06/2021, 7:54 PMKevin Kho
Theo Platt
12/06/2021, 8:14 PMKevin Kho
Theo Platt
12/06/2021, 8:17 PMKevin Kho
Theo Platt
12/06/2021, 8:18 PMTheo Platt
12/06/2021, 8:19 PMKevin Kho
Kevin Kho
Theo Platt
12/06/2021, 8:32 PMTheo Platt
12/06/2021, 8:34 PM@task
def wait_batch(job_id, delay, max_attempts):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>(f"Waiting for job to complete: {job_id}")
waiter = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
waiter.run(
waiter_kwargs={
'jobs': [job_id],
'WaiterConfig': {
'Delay': delay,
'MaxAttempts': max_attempts
}
},
)
<http://logger.info|logger.info>(f"Job complete: {job_id}")
return job_id
Kevin Kho