https://prefect.io logo
Title
k

Kyle Austin

02/23/2023, 3:46 PM
I have a question about working with the ConcurrentTaskRunner. I have a flow where I have a potentially large number (in test runs I am using 5k many) of independent implementations of a simple async task that I would like to submit all to the task runner so it can work on them concurrently. The flow is submitting all 5k to the task runner at the same time and I am getting many many warning messages like
10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
I almost always I am getting the following error message
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I am setting
persist_result=True
in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once! Here is kinda how the code looks like in the flow now
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for email in emails:
    email_send_message.submit(
        subject=email.subject,
        msg=email.rendered_html_template,
        email_server_credentials=humana_smtp,
        email_from=email.email_from,
        email_to=<http://email.to|email.to>,
        email_to_cc=<http://email.cc|email.cc>,
        email_to_bcc=email.bcc,
        attachments=email.attachments,
        images=email.images,
        dry_run=dry_run,
    )
I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
email_chunks_for_sending = chunkify(emails, 50)
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for chunk in email_chunks_for_sending:
    wait_for_complete_object = []
    for email in chunk:
        sent = email_send_message.submit(
            subject=email.subject,
            msg=email.rendered_html_template,
            email_server_credentials=humana_smtp,
            email_from=email.email_from,
            email_to=<http://email.to|email.to>,
            email_to_cc=<http://email.cc|email.cc>,
            email_to_bcc=email.bcc,
            attachments=email.attachments,
            images=email.images,
            dry_run=dry_run,
        )
        wait_for_complete_object.append(sent)
    [future.result() for future in wait_for_complete_object]
here chunkify I stole from another post on slack looks like
def chunkify(xs, size):

    return (xs[pos : pos + size] for pos in range(0, len(xs), size))
Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.
👀 1
✅ 1
t

Tim-Oliver

02/23/2023, 4:13 PM
I also use a buffer-queue to limit the number of submitted tasks. Otherwise I run out of memory on my system. This is my buffer-queue empty-function:
def wait_for_task_runs(
    results: list,
    buffer: list[PrefectFuture],
    max_buffer_length: int = 6,
):
    while len(buffer) >= max(1, max_buffer_length):
        results.append(buffer.pop(0).result())
Which I call inside the for-loop which submits the tasks. Like this a new task can start as soon as the first task in the buffer-queue has finished.
🙌 1
k

Kyle Austin

02/23/2023, 4:31 PM
This is a really cool idea @Tim-Oliver! I was thinking something similar but waaaaay less slick! Would be great if prefect task submission had a parameter indicating the max number to submit at the same time and implementing essentially what you have written above behind the scenes.
z

Zanie

02/23/2023, 5:27 PM
@Andrew Huang concurrency limits are applied after submission
👀 1
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Is not the cause of your problems, but a symptom. The API is returning a PENDING state for the task run which has no data attached to it.
10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
Similarly, this is probably a symptom. It looks like the flow has been reported as crashed or failed while task submission is still occurring in the background.
There was a recent GitHub issue asking for limits to be added here. Maybe we can introduce a setting that controls the local concurrent submissions.
k

Kyle Austin

02/23/2023, 6:35 PM
Thanks @Zanie Indeed issue 8539 would be an ideal solution in my opinion!
I'll keep my eyes on its progress
z

Zanie

02/23/2023, 7:00 PM
That issue asks us to enforce the server-side concurrency limits before submission, but that’s not particularly feasible. What do you think about my alternate proposal?
k

Kyle Austin

02/23/2023, 7:09 PM
That would be most satisfactory. Thanks Zanie!
t

Tim-Oliver

02/24/2023, 10:09 AM
Would such a local concurrent submission limit be applicable per task (i.e. taskA has a limit of 2, taskB has a limit of 5)? And does local in that context mean local to the flow-run?
k

Kyle Austin

02/24/2023, 11:18 AM
That's actually a really good point @Tim-Oliver -- When using your buffer-queue technique (which really helped a ton BTW) I do have some different limits set for different kinds of tasks.......
z

Zanie

02/24/2023, 4:07 PM
Hmm we could limit it per task. And yeah, local to a flow run / client side.
Then it’s not a setting though, it’s a
@task
option which idk if I’m as excited about
t

Tim-Oliver

02/24/2023, 4:15 PM
This is not a pressing issue for me, since I can use the buffer-queue approach. The reason why I have to do this, is that my task-runs require a substantial amount of memory (I am processing images) and number-of-task-runs x memory-required-per-task-run exceeds my infrastructure limit. Having an option in
@task
would allow me to have fine-grained control. That being said, I am also open to try other strategies.
k

Kyle Austin

02/24/2023, 8:21 PM
I definitely would be most appreciative of a global client option. That would probably handle almost all the cases I really need and like Tim says I can use a little custom logic for the extreme situations.
z

Zanie

02/24/2023, 8:25 PM
Thanks for the feedback! We’re exploring ways to extend concurrency limits to solve these problems as well.