Kyle Austin
02/23/2023, 3:46 PM10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
I almost always I am getting the following error message
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I am setting
persist_result=True
in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once!
Here is kinda how the code looks like in the flow now
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for email in emails:
email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
email_chunks_for_sending = chunkify(emails, 50)
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for chunk in email_chunks_for_sending:
wait_for_complete_object = []
for email in chunk:
sent = email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
wait_for_complete_object.append(sent)
[future.result() for future in wait_for_complete_object]
here chunkify I stole from another post on slack looks like
def chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))
Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.Tim-Oliver
02/23/2023, 4:13 PMdef wait_for_task_runs(
results: list,
buffer: list[PrefectFuture],
max_buffer_length: int = 6,
):
while len(buffer) >= max(1, max_buffer_length):
results.append(buffer.pop(0).result())
Which I call inside the for-loop which submits the tasks. Like this a new task can start as soon as the first task in the buffer-queue has finished.Kyle Austin
02/23/2023, 4:31 PMAndrew Huang
02/23/2023, 5:12 PMZanie
02/23/2023, 5:27 PMIs not the cause of your problems, but a symptom. The API is returning a PENDING state for the task run which has no data attached to it.prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Similarly, this is probably a symptom. It looks like the flow has been reported as crashed or failed while task submission is still occurring in the background.10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
Kyle Austin
02/23/2023, 6:35 PMZanie
02/23/2023, 7:00 PMKyle Austin
02/23/2023, 7:09 PMTim-Oliver
02/24/2023, 10:09 AMKyle Austin
02/24/2023, 11:18 AMZanie
02/24/2023, 4:07 PM@task
option which idk if I’m as excited aboutTim-Oliver
02/24/2023, 4:15 PM@task
would allow me to have fine-grained control. That being said, I am also open to try other strategies.Kyle Austin
02/24/2023, 8:21 PMZanie
02/24/2023, 8:25 PM