Matan Drory

    Matan Drory

    7 months ago
    Hello. We are working with
    prefect[aws]==0.15.3
    . We have a flow that generates a large number of batch jobs, then submit them, splits them to chunks of 100 and generates an AWS waiter object per 100. We have one very large jobs with over 450 batch calls. This job seem to be stuck even though it progressed until the end. When tracking the progress we can see that a few tasks are stuck as mapped even though all child tasks are done. i.e. 472 successful definitions created in
    create_ive_analysis
    then all of them were submitted and chunked in
    submit_jobs_analysis
    and then all 5
    AWSClientWait
    jobs were done (we call it with map on the chunked job ids). The parent block is still in mapped mode. Also, sometimes an
    AWSClientWait
    tasks fails and the job doesn’t fail, it just stays there (again this is with a mapped
    AWSClientWait
    ) Wait code
    wait_res = AWSClientWait(client='batch', waiter_name='JobComplete').map(waiter_kwargs=batched_waits)
    Where batched_waits is created by
    @task
    def submit_jobs_and_batch_job_awaits(jobs: List, batched_num=BATCH_CLIENT_WAIT_MAX_SIZE):
        submitted_jobs = [BatchSubmit().run(**job) for job in jobs]
        waits = []
        for i in range(0, len(submitted_jobs), batched_num):
            waits.append(
                {
                    'jobs': submitted_jobs[i : i + batched_num],
                    'WaiterConfig': {
                        'Delay': 10,
                        'MaxAttempts': 10000,
                    },
                }
            )
        return
    What could cause that?
    Kevin Kho

    Kevin Kho

    7 months ago
    Hi @Matan Drory, I think this is related to the size of the batch, but not sure. See this issue, it might be relevant.
    Actually sorry this looks like it ran. What is your executor?
    Matan Drory

    Matan Drory

    7 months ago
    local dask
    I see that the mapped jobs stay blue in succesfull finished jobs as well. I am more worried about the AWSwait failing and the job hanging
    Kevin Kho

    Kevin Kho

    7 months ago
    Can you try using processes instead of threads for the LocalDaskExecutor? They tend to be more stable
    LocalDaskExecutor(scheduler="processes")
    Matan Drory

    Matan Drory

    7 months ago
    Yeah, we are
    with Flow(
        'ive',
        executor=LocalDaskExecutor(scheduler='processes'),
        state_handlers=[
            startup_handler,
            completion_handler,
            ive_flow_external_job_updater,
        ],
    ) as flow:
    Kevin Kho

    Kevin Kho

    7 months ago
    Let me ping the guy in the Github discussion and see if he has any ideas
    t

    Theo Platt

    7 months ago
    That's me then @Kevin Kho! @Matan Drory we had similar trouble with AWSWait and ended up writing this task to use 100 tranches of waiters
    @task
    def wait_batches(job_ids, delay, max_attempts):
        logger = prefect.context.get('logger')
    
        if len(job_ids) > 0:
            <http://logger.info|logger.info>(f"Waiting for job(s) to complete: {job_ids}")
    
            waiter = AWSClientWait(
                client='batch',
                waiter_name='JobComplete',
            )
    
            aws_waiter_limit: int = 100  # aws imposed limit on batch.describe_job() in boto3
            tranched_job_ids = [job_ids[pos:pos + aws_waiter_limit] for pos in range(0, len(job_ids), aws_waiter_limit)]
            for tranch in tranched_job_ids:
                <http://logger.info|logger.info>(f"Tranch: {tranch[0]}, {tranch[-1]}, {len(tranch)}")
                waiter.run(
                    waiter_kwargs={
                        'jobs': tranch,
                        'WaiterConfig': {
                            'Delay': delay,
                            'MaxAttempts': max_attempts
                        }
                    },
                )
    
        return
    Matan Drory

    Matan Drory

    7 months ago
    so you are submitting everything in one go and then waiting one group at a time instead of mapping the awaits concurrently
    t

    Theo Platt

    7 months ago
    Yes - exactly right!
    We have to wait until the last task finishes anyway
    Matan Drory

    Matan Drory

    7 months ago
    Yeah, just so strange that it matters since the AWS wait tasks are just calls for boto3 describe batch job…
    I understand the 100 limit (it’s build in to the describe call)
    t

    Theo Platt

    7 months ago
    We tried a map of waiters like you but we experienced the same unreliability. Never got to the bottom of it.
    Matan Drory

    Matan Drory

    6 months ago
    @Kyle McChesney