Does anyone have a working example of using `Batch...
# ask-community
k
Does anyone have a working example of using
BatchSubmit
and
AWSClientWait
in order to submit a job and wait for it?
Copy code
with Flow('flow'):

    batch_input = Parameter('batch_input')

    job_res = BatchSubmit(
        job_name='job_name',
        job_definition='job-def',
        job_queue='job-queue',
    ).run(
        batch_kwargs={
            'parameters': {
                'batch_input': batch_input,
            },
        },
    )

    wait_res = AWSClientWait(
        client='batch',
        waiter_name='JobComplete',
    ).run(
        waiter_kwargs={
            'jobs': [job_res]
        }
    )
This is what I have. Feels a bit weird to call
.run
directly on the task (I am new at this and have mostly just run
@task
annotated functions). Additionally, boto is complaining that the
batch_input
values need to be strings:
type: <class 'prefect.core.parameter.Parameter'>, valid types: <class 'str'>
k
I think you can do
Copy code
BatchSubmit(
        job_name='job_name',
        job_definition='job-def',
        job_queue='job-queue',
    )(
        batch_kwargs={
            'parameters': {
                'batch_input': batch_input,
            },
        },
    )
And you can also do this:
Copy code
job_res = BatchSubmit(
        job_name='job_name',
        job_definition='job-def',
        job_queue='job-queue',
    )

wait_res = AWSClientWait(
        client='batch',
        waiter_name='JobComplete',
    )

with Flow('flow'):
    batch_input = Parameter('batch_input')
    a = job_res(
        batch_kwargs={
            'parameters': {
                'batch_input': batch_input,
            },
        },
    )
    b = wait_res(
        waiter_kwargs={
            'jobs': [job_res]
        }
    )
In the first snippet, first parenthesis in
init
. Second is the
run
k
Thanks again for the help @Kevin Kho
👍 1
t
@Kevin Kho Following on form this example, how do you change the default delay and maxAttempts as defined in batch.json? https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/aws/waiters/batch.json#L84-L104 Thank you!
Ignore that - solved it
Copy code
waiter.run(
        waiter_kwargs={
            'jobs': [job_id],
            'WaiterConfig': {
                'Delay': 5,
                'MaxAttempts': 1000
            }
        },   
    )
👍 1
k
Glad you got that figured out 👍
k
Following back up on this AWS Batch Submit task question. Wondering if this can be scaled out using map. I have the following example:
Copy code
from prefect.tasks.aws.batch import BatchSubmit

@task
def setup_runs(....) -> List[Dict]:
    runs = {}
    for foo in bar:
        ...
        runs.append(dict(
            job_name=job_name,
            job_definition=JOB_DEF,
            job_queue=JOB_QUEUE,
            boto_kwargs=dict(
                parameters=dict(
                    ...
                )
            )
        ))
    return runs


with Flow(
    'flow',
) as flow:
    runs = setup_runs(...)
    batch_jobs = BatchSubmit.map(runs)
Hoping to have 1 task generate a list of job inputs, and then map that across batch submit tasks. Finally, I want to setup a map of client waiters. Any ideas?
It seems like if i debug here: prefect/tasks/aws/batch.py(71)run() I can see that my individual payload from the map was passed to
run
, but instead of expanding the kwargs from the dict, it just passed the whole dict as the first argument:
Copy code
prefect/tasks/aws/batch.py(71)run()
     70         import ipdb; ipdb.set_trace()
---> 71         if not job_name:
     72             raise ValueError("A job name must be provided.")

ipdb> job_name
{'job_name': 'job', 'job_definition': 'def-latest', 'job_queue': 'queue', 'boto_kwargs': {'parameters': {...}}}}
The signature is
Copy code
@defaults_from_attrs("job_name", "job_definition", "job_queue")
    def run(
        self,
        job_name: str = None,
        job_definition: str = None,
        job_queue: str = None,
        batch_kwargs: dict = None,
        credentials: str = None,
    ):
k
I think you might need an intermediate task to split out that dict into 5 lists and then pass those as inputs to the map call of
BatchSubmit
. Or you might need to take in the whole dictionary, you can subclass
BatchSubmit
and split out the dict and pass them maybe.
k
I updated my “setup_runs” flow to return
Copy code
return [BatchSubmit().run(**run) for run in runs
]
And then I just did
Copy code
runs = setup_runs(...)
wait_res = AWSClientWait(
    client='batch',
    waiter_name='JobComplete',
)(
    waiter_kwargs={
        'jobs': runs,
    },
)
seems to be working
k
ah I see that would work yep. I just dunno about AWS API limits. They seem to be so low