https://prefect.io logo
s

Sylvain Hazard

10/08/2021, 8:54 AM
I'm having issues with mapped tasks that run when they should not with the wrong input. I'll detail in the thread so as to not post a huge wall of text.
Our flow feels pretty simple and looks something like this :
Copy code
batch_size = Parameter("batch_size", default = 50)

codes = get_codes() # Gets a list of codes from crawling a website (~13 000 codes)
batches = batch_generator(codes, batch_size) # Generates batches in order to parallelize tasks
documents = get_documents.map(batches) # Loads a document for each code in the batch. Runs about 30s to 2 min for a batch of size 50
uploaded_documents = upload_documents(documents) # Uploads the loaded documents to a DB. Runs ~5 min to 20 min for a batch of size 50
parsed_documents = parse_documents.map(uploaded_documents) # Parses the uploaded documents using a home API. Runs about ~1 min for a batch of 50.
We run this Flow on a DaskExecutor with 2 workers. The current configuration generates maps of ~300 childs The issue we see is the following : • The first tasks run flawlessly as expected • The first upload tasks run correctly • At some point, while 4 upload tasks are already running, every remaining pending child upload task are launched at the same time with a
None
input value, which makes them fail. • The 4 tasks that were running at this time end up being killed by the Zombie Killer after a while. • As we are using a Dask Executor, DFE allows a portion of downstream parsing tasks to succeed. The rest is divided between failed tasks (receiving
None
inputs as well) and TriggerFailed tasks.
Heres the schematic of our last run. There is an additional parameter but it is not relevant whatsoever to our issue.
For details, 51/311 upload runs succeeded, 4 were killed by the Zombie Killer and the rest were launched with a
None
input and failed because of it. 51/311 parsing tasks succeeded, 225/311 failed and 35/311 TriggerFailed
If at all relevant, it took about 2.5h between launching the first upload job and the "miss-launched" ones.
k

Kevin Kho

10/08/2021, 2:18 PM
Hey @Sylvain Hazard, am staring at this and not sure what’s going on. Could you show me code for
batch_generator
?
s

Sylvain Hazard

10/08/2021, 2:21 PM
Sure thing.
Copy code
@task(name="batch-generator")
def batch_generator(iterable, batch_size: int, padding_value: Any = None) -> List[Any]:
    batches = list(zip_longest(*[iter(iterable)] * batch_size, fillvalue=padding_value))
    batches_unpadded = [tuple(b for b in batch if b is not None) for batch in batches]
    <http://logger.info|logger.info>(f"Generating {len(batches_unpadded)} batchs with batch_size={batch_size}")
    return batches_unpadded
I did not code this and am not sure why we need to pad those but it probably does not matter. Basically it takes a long list and returns multiple slices of that list each of size
batch_size
It allows us to still parallelize while not creating a child for each element of the initial list which is approx. 13K elements long
k

Kevin Kho

10/08/2021, 3:26 PM
I will try testing this today
Is the
uploaded_documents = upload_documents(documents) # Uploads the loaded documents to a DB. Runs ~5 min to 20 min for a batch of size 50
a mapped operation?
I am wondering if this fails due to DB concurrency?
Do you know if your DaskExecutor workers are dying/restarting?
s

Sylvain Hazard

10/08/2021, 4:26 PM
Yeah sorry, upload_documents is mapped.
I have no idea about the DaskExecutor workers dying, I don't know how I could get information about it
We are using CouchDB, concurrency does not seem great but since ~50 upload jobs succeed before the flow going crazy, I don't think the issue is directly linked to it but I could be wrong.
k

Kevin Kho

10/08/2021, 4:29 PM
I assume this works for a smaller amount of tasks and it’s a scaling issue right?
s

Sylvain Hazard

10/08/2021, 4:33 PM
Yeah, it ran correctly when debugging on a shorter amount of documents.
Just relaunched it with a 500 documents limit in 10 batches, I'll keep you posted when I get the result just in case.
The "small scale" run did succeed.
Is there any way that running longer and doing more stuff makes the executors run out of memory or something ?
k

Kevin Kho

10/08/2021, 5:09 PM
Definitely.

This

video will help understand that.
s

Sylvain Hazard

10/08/2021, 5:12 PM
I'll check that out, looks like a promising lead ! I'm on weekend right now and won't have time do to it until monday. Thanks for you help and I'll keep you posted if I have any lead on this. Cheers !
k

Kevin Kho

10/08/2021, 5:12 PM
Of course! Not sure this will solve the issue but we can try