Kyle McChesney
07/30/2021, 4:07 PMBatchSubmit
and AWSClientWait
in order to submit a job and wait for it?
with Flow('flow'):
batch_input = Parameter('batch_input')
job_res = BatchSubmit(
job_name='job_name',
job_definition='job-def',
job_queue='job-queue',
).run(
batch_kwargs={
'parameters': {
'batch_input': batch_input,
},
},
)
wait_res = AWSClientWait(
client='batch',
waiter_name='JobComplete',
).run(
waiter_kwargs={
'jobs': [job_res]
}
)
This is what I have. Feels a bit weird to call .run
directly on the task (I am new at this and have mostly just run @task
annotated functions). Additionally, boto is complaining that the batch_input
values need to be strings: type: <class 'prefect.core.parameter.Parameter'>, valid types: <class 'str'>
Kevin Kho
BatchSubmit(
job_name='job_name',
job_definition='job-def',
job_queue='job-queue',
)(
batch_kwargs={
'parameters': {
'batch_input': batch_input,
},
},
)
Kevin Kho
job_res = BatchSubmit(
job_name='job_name',
job_definition='job-def',
job_queue='job-queue',
)
wait_res = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)
with Flow('flow'):
batch_input = Parameter('batch_input')
a = job_res(
batch_kwargs={
'parameters': {
'batch_input': batch_input,
},
},
)
b = wait_res(
waiter_kwargs={
'jobs': [job_res]
}
)
Kevin Kho
init
. Second is the run
Kyle McChesney
07/30/2021, 4:27 PMTheo Platt
08/20/2021, 11:00 PMTheo Platt
08/20/2021, 11:32 PMwaiter.run(
waiter_kwargs={
'jobs': [job_id],
'WaiterConfig': {
'Delay': 5,
'MaxAttempts': 1000
}
},
)
Theo Platt
08/20/2021, 11:33 PMKevin Kho
Kyle McChesney
09/30/2021, 7:50 PMfrom prefect.tasks.aws.batch import BatchSubmit
@task
def setup_runs(....) -> List[Dict]:
runs = {}
for foo in bar:
...
runs.append(dict(
job_name=job_name,
job_definition=JOB_DEF,
job_queue=JOB_QUEUE,
boto_kwargs=dict(
parameters=dict(
...
)
)
))
return runs
with Flow(
'flow',
) as flow:
runs = setup_runs(...)
batch_jobs = BatchSubmit.map(runs)
Hoping to have 1 task generate a list of job inputs, and then map that across batch submit tasks. Finally, I want to setup a map of client waiters. Any ideas?Kyle McChesney
09/30/2021, 7:57 PMrun
, but instead of expanding the kwargs from the dict, it just passed the whole dict as the first argument:
prefect/tasks/aws/batch.py(71)run()
70 import ipdb; ipdb.set_trace()
---> 71 if not job_name:
72 raise ValueError("A job name must be provided.")
ipdb> job_name
{'job_name': 'job', 'job_definition': 'def-latest', 'job_queue': 'queue', 'boto_kwargs': {'parameters': {...}}}}
The signature is
@defaults_from_attrs("job_name", "job_definition", "job_queue")
def run(
self,
job_name: str = None,
job_definition: str = None,
job_queue: str = None,
batch_kwargs: dict = None,
credentials: str = None,
):
Kevin Kho
BatchSubmit
. Or you might need to take in the whole dictionary, you can subclass BatchSubmit
and split out the dict and pass them maybe.Kyle McChesney
09/30/2021, 8:37 PMreturn [BatchSubmit().run(**run) for run in runs
]
And then I just did
runs = setup_runs(...)
wait_res = AWSClientWait(
client='batch',
waiter_name='JobComplete',
)(
waiter_kwargs={
'jobs': runs,
},
)
seems to be workingKevin Kho