Sylvain Hazard

    Sylvain Hazard

    11 months ago
    I'm having issues with mapped tasks that run when they should not with the wrong input. I'll detail in the thread so as to not post a huge wall of text.
    Our flow feels pretty simple and looks something like this :
    batch_size = Parameter("batch_size", default = 50)
    
    codes = get_codes() # Gets a list of codes from crawling a website (~13 000 codes)
    batches = batch_generator(codes, batch_size) # Generates batches in order to parallelize tasks
    documents = get_documents.map(batches) # Loads a document for each code in the batch. Runs about 30s to 2 min for a batch of size 50
    uploaded_documents = upload_documents(documents) # Uploads the loaded documents to a DB. Runs ~5 min to 20 min for a batch of size 50
    parsed_documents = parse_documents.map(uploaded_documents) # Parses the uploaded documents using a home API. Runs about ~1 min for a batch of 50.
    We run this Flow on a DaskExecutor with 2 workers. The current configuration generates maps of ~300 childs The issue we see is the following : • The first tasks run flawlessly as expected • The first upload tasks run correctly • At some point, while 4 upload tasks are already running, every remaining pending child upload task are launched at the same time with a
    None
    input value, which makes them fail. • The 4 tasks that were running at this time end up being killed by the Zombie Killer after a while. • As we are using a Dask Executor, DFE allows a portion of downstream parsing tasks to succeed. The rest is divided between failed tasks (receiving
    None
    inputs as well) and TriggerFailed tasks.
    Heres the schematic of our last run. There is an additional parameter but it is not relevant whatsoever to our issue.
    For details, 51/311 upload runs succeeded, 4 were killed by the Zombie Killer and the rest were launched with a
    None
    input and failed because of it. 51/311 parsing tasks succeeded, 225/311 failed and 35/311 TriggerFailed
    If at all relevant, it took about 2.5h between launching the first upload job and the "miss-launched" ones.
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey @Sylvain Hazard, am staring at this and not sure what’s going on. Could you show me code for
    batch_generator
    ?
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    Sure thing.
    @task(name="batch-generator")
    def batch_generator(iterable, batch_size: int, padding_value: Any = None) -> List[Any]:
        batches = list(zip_longest(*[iter(iterable)] * batch_size, fillvalue=padding_value))
        batches_unpadded = [tuple(b for b in batch if b is not None) for batch in batches]
        <http://logger.info|logger.info>(f"Generating {len(batches_unpadded)} batchs with batch_size={batch_size}")
        return batches_unpadded
    I did not code this and am not sure why we need to pad those but it probably does not matter. Basically it takes a long list and returns multiple slices of that list each of size
    batch_size
    It allows us to still parallelize while not creating a child for each element of the initial list which is approx. 13K elements long
    Kevin Kho

    Kevin Kho

    11 months ago
    I will try testing this today
    Is the
    uploaded_documents = upload_documents(documents) # Uploads the loaded documents to a DB. Runs ~5 min to 20 min for a batch of size 50
    a mapped operation?
    I am wondering if this fails due to DB concurrency?
    Do you know if your DaskExecutor workers are dying/restarting?
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    Yeah sorry, upload_documents is mapped.
    I have no idea about the DaskExecutor workers dying, I don't know how I could get information about it
    We are using CouchDB, concurrency does not seem great but since ~50 upload jobs succeed before the flow going crazy, I don't think the issue is directly linked to it but I could be wrong.
    Kevin Kho

    Kevin Kho

    11 months ago
    I assume this works for a smaller amount of tasks and it’s a scaling issue right?
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    Yeah, it ran correctly when debugging on a shorter amount of documents.
    Just relaunched it with a 500 documents limit in 10 batches, I'll keep you posted when I get the result just in case.
    The "small scale" run did succeed.
    Is there any way that running longer and doing more stuff makes the executors run out of memory or something ?
    Kevin Kho

    Kevin Kho

    11 months ago
    Definitely.

    This

    video will help understand that.
    Sylvain Hazard

    Sylvain Hazard

    11 months ago
    I'll check that out, looks like a promising lead ! I'm on weekend right now and won't have time do to it until monday. Thanks for you help and I'll keep you posted if I have any lead on this. Cheers !
    Kevin Kho

    Kevin Kho

    11 months ago
    Of course! Not sure this will solve the issue but we can try