https://prefect.io logo
Title
s

Sander

04/25/2022, 7:27 PM
Hi, when spinning up tasks in a flow we see some (spinup?) delays. Is there some documentation on how this delay is minimised? I'd expect that the state reporting to Orion server is offloaded from the “hot loop” to avoid latency incurred by the network?
k

Kevin Kho

04/25/2022, 7:28 PM
Hi @Sander, this is in the introductions channel but are you using Orion on local or using Cloud 2.0?
s

Sander

04/25/2022, 7:29 PM
Orion on local indeed. I posted here as I was looking for some intro into the inner workings? 🙂 network io as it seems to be using the webapi.
Reason why I'm asking is because I have some concerns on the overall timeliness of a large set of tasks within a flow. And I can't really imagine that you haven't thought about that and how it should perform.
a

Anna Geller

04/25/2022, 7:40 PM
can you share your flow and some benchmark? there shouldn't be any high latency here. I'd be happy to reproduce if you share your flow code
s

Sander

04/25/2022, 7:41 PM
Cool. Let me get back to you with some benchmark code. Likely tomorrow.
👍 1
Hi, I know this code is a bit simple but highlights the behaviour.
from prefect import task, flow, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from random import seed, randint
from time import perf_counter
import logging

logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
_logger = logging.getLogger(__name__)

# seed random number generator
seed(1)

task_runner = ConcurrentTaskRunner()


@task(name="a")
def a():
    i = randint(0, 100)
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"We got {i}")
    return i


@task(name="b")
def b():
    i = randint(0, 100)
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"We got {i}")
    return i


@task(name="c")
def c(_a, _b):
    logger = get_run_logger()
    v = _a * _b
    <http://logger.info|logger.info>(f"{_a} * {_b} = {v}")
    return v


@flow(name="some-flow", task_runner=task_runner)
def test_flow():
    _a = a()
    _b = b()
    _c = c(_a, _b)


def no_flow():
    _a = randint(0, 100)
    _<http://logger.info|logger.info>(f"We got {_a}")
    _b = randint(0, 100)
    _<http://logger.info|logger.info>(f"We got {_b}")
    _c = _a * _b
    _<http://logger.info|logger.info>(f"{_a} * {_b} = {_c}")


if __name__ == "__main__":
    start = perf_counter()
    for cycle in range(100):
        test_flow()

    print(f"Flow took {perf_counter() - start}")
I get about 74s with prefect and 0.01 without.
a

Anna Geller

04/26/2022, 1:32 PM
Thanks so much for this example, it helps a lot! This is not entirely correct:
print(f"Flow took {perf_counter() - start}")
because this code doesn't create 100 tasks, but rather 100 flow runs of
test_flow
. Therefore, it's normal that it causes some overhead since every flow run creates a separate object in the backend. Does your benchmark imply less than a second per flow run? For me (using Cloud 2.0 with S3 storage and result backend) it took 3 sec per flow run Also, I believe the main overhead could be not related to the backend API but rather because Prefect persists all task run results, while your
no_flow()
doesn't. So a more fair comparison would be when
no_flow()
would pickle and persist results to some storage backend (e.g. S3). I will ask the team whether result checkpointing can be disabled in 2.0 - in 1.0 this way possible by adding
checkpoint=False
on the task decorator.
s

Sander

04/26/2022, 2:28 PM
Yeah. The loop was more to get some reasonable numbers instead of tiny ones:)
I'm inclined to think that the logger context also adds quite a bit of overhead. (Need to check) as it needs the context from the server I guess
a

Anna Geller

04/26/2022, 2:36 PM
that's correct. As promised I asked about disabling result checkpointing and it looks like it's not possible atm because we don't expose the results interface yet (only used internally so far) but this is an active roadmap item
👍 1