Kyle McChesney

    Kyle McChesney

    1 year ago
    Does anyone have a working example of using
    BatchSubmit
    and
    AWSClientWait
    in order to submit a job and wait for it?
    with Flow('flow'):
    
        batch_input = Parameter('batch_input')
    
        job_res = BatchSubmit(
            job_name='job_name',
            job_definition='job-def',
            job_queue='job-queue',
        ).run(
            batch_kwargs={
                'parameters': {
                    'batch_input': batch_input,
                },
            },
        )
    
        wait_res = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        ).run(
            waiter_kwargs={
                'jobs': [job_res]
            }
        )
    This is what I have. Feels a bit weird to call
    .run
    directly on the task (I am new at this and have mostly just run
    @task
    annotated functions). Additionally, boto is complaining that the
    batch_input
    values need to be strings:
    type: <class 'prefect.core.parameter.Parameter'>, valid types: <class 'str'>
    Kevin Kho

    Kevin Kho

    1 year ago
    I think you can do
    BatchSubmit(
            job_name='job_name',
            job_definition='job-def',
            job_queue='job-queue',
        )(
            batch_kwargs={
                'parameters': {
                    'batch_input': batch_input,
                },
            },
        )
    And you can also do this:
    job_res = BatchSubmit(
            job_name='job_name',
            job_definition='job-def',
            job_queue='job-queue',
        )
    
    wait_res = AWSClientWait(
            client='batch',
            waiter_name='JobComplete',
        )
    
    with Flow('flow'):
        batch_input = Parameter('batch_input')
        a = job_res(
            batch_kwargs={
                'parameters': {
                    'batch_input': batch_input,
                },
            },
        )
        b = wait_res(
            waiter_kwargs={
                'jobs': [job_res]
            }
        )
    In the first snippet, first parenthesis in
    init
    . Second is the
    run
    Kyle McChesney

    Kyle McChesney

    1 year ago
    Thanks again for the help @Kevin Kho
    t

    Theo Platt

    1 year ago
    @Kevin Kho Following on form this example, how do you change the default delay and maxAttempts as defined in batch.json? https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/aws/waiters/batch.json#L84-L104 Thank you!
    Ignore that - solved it
    waiter.run(
            waiter_kwargs={
                'jobs': [job_id],
                'WaiterConfig': {
                    'Delay': 5,
                    'MaxAttempts': 1000
                }
            },   
        )
    Kevin Kho

    Kevin Kho

    1 year ago
    Glad you got that figured out 👍
    Kyle McChesney

    Kyle McChesney

    11 months ago
    Following back up on this AWS Batch Submit task question. Wondering if this can be scaled out using map. I have the following example:
    from prefect.tasks.aws.batch import BatchSubmit
    
    @task
    def setup_runs(....) -> List[Dict]:
        runs = {}
        for foo in bar:
            ...
            runs.append(dict(
                job_name=job_name,
                job_definition=JOB_DEF,
                job_queue=JOB_QUEUE,
                boto_kwargs=dict(
                    parameters=dict(
                        ...
                    )
                )
            ))
        return runs
    
    
    with Flow(
        'flow',
    ) as flow:
        runs = setup_runs(...)
        batch_jobs = BatchSubmit.map(runs)
    Hoping to have 1 task generate a list of job inputs, and then map that across batch submit tasks. Finally, I want to setup a map of client waiters. Any ideas?
    It seems like if i debug here: prefect/tasks/aws/batch.py(71)run() I can see that my individual payload from the map was passed to
    run
    , but instead of expanding the kwargs from the dict, it just passed the whole dict as the first argument:
    prefect/tasks/aws/batch.py(71)run()
         70         import ipdb; ipdb.set_trace()
    ---> 71         if not job_name:
         72             raise ValueError("A job name must be provided.")
    
    ipdb> job_name
    {'job_name': 'job', 'job_definition': 'def-latest', 'job_queue': 'queue', 'boto_kwargs': {'parameters': {...}}}}
    The signature is
    @defaults_from_attrs("job_name", "job_definition", "job_queue")
        def run(
            self,
            job_name: str = None,
            job_definition: str = None,
            job_queue: str = None,
            batch_kwargs: dict = None,
            credentials: str = None,
        ):
    Kevin Kho

    Kevin Kho

    11 months ago
    I think you might need an intermediate task to split out that dict into 5 lists and then pass those as inputs to the map call of
    BatchSubmit
    . Or you might need to take in the whole dictionary, you can subclass
    BatchSubmit
    and split out the dict and pass them maybe.
    Kyle McChesney

    Kyle McChesney

    11 months ago
    I updated my “setup_runs” flow to return
    return [BatchSubmit().run(**run) for run in runs
    ]
    And then I just did
    runs = setup_runs(...)
    wait_res = AWSClientWait(
        client='batch',
        waiter_name='JobComplete',
    )(
        waiter_kwargs={
            'jobs': runs,
        },
    )
    seems to be working
    Kevin Kho

    Kevin Kho

    11 months ago
    ah I see that would work yep. I just dunno about AWS API limits. They seem to be so low