https://prefect.io logo
Title
m

Marcus Hughes

08/01/2022, 5:05 PM
Code Contest What if we have many (and by many I mean MANY) subflows? In fact, let's just recursively create sub-flows while begging for a Prefect keyboard and/or a Marvin Rubber Duck ๐Ÿ˜†. See the first comment for code. As one might expect, Prefect runs into trouble when
subflow_count
(the number of subflows a flow generates aka breadth of the recursion tree) and
levels
(how many chained subflows there are aka the depth of the recursion tree) are large, 25 and 10 here respectively. That's 10^25 - 1 flows, truly absurd with long chains of subflows. When it fails, I get
OSError: [Errno 24] Too many open files
. I noticed that the crashed flows don't get marked as crashed but stay with the
running
status. I'm not sure there's an easy way for you to resolve that though. On the positive side, my computer didn't crash! At smaller values, e.g.
subflow_count=8
and
levels=3
, it runs just fine. My team and I are excited to use Prefect in the upcoming NASA PUNCH mission to prepare scientific data of the Sun. It's really helped make our pipeline more elegant.
๐Ÿš€ 22
๐Ÿ™Œ 1
๐Ÿง‘โ€๐Ÿš€ 11
๐ŸŒž 12
๐Ÿช 13
from prefect import flow
import asyncio
import coolname
import random

BEGS = ["I'd love a Prefect keyboard.",
        "Yay Marvin!",
        "I wish I had a Prefect rubber duck too!"]\



def generate_subflow(level=2, subflow_count=3):
    if level > 0:
        @flow(retries=10, name=f"subflow-{level}-{coolname.generate_slug(2)}")
        async def subflow():
            if random.random() < 0.05:
                await asyncio.sleep(10)
                raise RuntimeError("Sometimes I fail. But Prefect helps me try again.")
            else:
                print(random.choice(BEGS))

            next_level = [generate_subflow(level=level-1, subflow_count=subflow_count)()
                          for _ in range(subflow_count)]
            await asyncio.gather(*next_level)
        return subflow
    else:
        @flow
        async def goodbye():
            print("Goodbye for now!")
        return goodbye


@flow
async def contest_flow(subflow_count=25, levels=10):
    to_run = [generate_subflow(level=levels, subflow_count=subflow_count)() for _ in range(subflow_count)]
    await asyncio.gather(*to_run)


if __name__ == "__main__":
    results = asyncio.run(contest_flow())
j

jawnsy

08/01/2022, 5:29 PM
Hey there! What OS are you running on? Often these types of errors are due to file descriptor limits, and at least on Linux, you can tune these using
ulimit
You may need to tune the โ€œopen filesโ€ count, which is the maximum number of file descriptors that processes can have open at a given time, which includes both files and sockets on Unix-like systems, including Linux and possibly macOS (Iโ€™m not too sure how macs handle this) https://access.redhat.com/solutions/61334 https://linuxhint.com/permanently_set_ulimit_value/
:thank-you: 2
c

Chris Reuter

08/01/2022, 5:29 PM
What an awesome submission! Thank you @Marcus Hughes :thank-you:
๐Ÿ‘ 1
๐Ÿ˜€ 1
m

Marcus Hughes

08/01/2022, 5:34 PM
@jawnsy I ran this on macOS. Maybe if I find the time later, I'll try playing around with your suggestion and see what happens.
j

jawnsy

08/01/2022, 5:37 PM
Checking on my 2021 MacBook M1 running Monterey (I think), I get:
$ ulimit -a
core file size              (blocks, -c) 0
data seg size               (kbytes, -d) unlimited
file size                   (blocks, -f) unlimited
max locked memory           (kbytes, -l) unlimited
max memory size             (kbytes, -m) unlimited
open files                          (-n) 256
pipe size                (512 bytes, -p) 1
stack size                  (kbytes, -s) 8176
cpu time                   (seconds, -t) unlimited
max user processes                  (-u) 2666
virtual memory              (kbytes, -v) unlimited
256 is a really tiny number, so changing it might help. I just tried increasing to 8192 and that was fine. It might be possible to increase it further using sudo and changing hard limits, or changing the limits.conf if Mac has thatโ€ฆ
๐Ÿ™Œ 1