Sander
04/25/2022, 7:27 PMKevin Kho
04/25/2022, 7:28 PMSander
04/25/2022, 7:29 PMAnna Geller
04/25/2022, 7:40 PMSander
04/25/2022, 7:41 PMfrom prefect import task, flow, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from random import seed, randint
from time import perf_counter
import logging
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
_logger = logging.getLogger(__name__)
# seed random number generator
seed(1)
task_runner = ConcurrentTaskRunner()
@task(name="a")
def a():
i = randint(0, 100)
logger = get_run_logger()
<http://logger.info|logger.info>(f"We got {i}")
return i
@task(name="b")
def b():
i = randint(0, 100)
logger = get_run_logger()
<http://logger.info|logger.info>(f"We got {i}")
return i
@task(name="c")
def c(_a, _b):
logger = get_run_logger()
v = _a * _b
<http://logger.info|logger.info>(f"{_a} * {_b} = {v}")
return v
@flow(name="some-flow", task_runner=task_runner)
def test_flow():
_a = a()
_b = b()
_c = c(_a, _b)
def no_flow():
_a = randint(0, 100)
_<http://logger.info|logger.info>(f"We got {_a}")
_b = randint(0, 100)
_<http://logger.info|logger.info>(f"We got {_b}")
_c = _a * _b
_<http://logger.info|logger.info>(f"{_a} * {_b} = {_c}")
if __name__ == "__main__":
start = perf_counter()
for cycle in range(100):
test_flow()
print(f"Flow took {perf_counter() - start}")
Anna Geller
04/26/2022, 1:32 PMprint(f"Flow took {perf_counter() - start}")
because this code doesn't create 100 tasks, but rather 100 flow runs of test_flow
. Therefore, it's normal that it causes some overhead since every flow run creates a separate object in the backend. Does your benchmark imply less than a second per flow run? For me (using Cloud 2.0 with S3 storage and result backend) it took 3 sec per flow run
Also, I believe the main overhead could be not related to the backend API but rather because Prefect persists all task run results, while your no_flow()
doesn't. So a more fair comparison would be when no_flow()
would pickle and persist results to some storage backend (e.g. S3).
I will ask the team whether result checkpointing can be disabled in 2.0 - in 1.0 this way possible by adding checkpoint=False
on the task decorator.Sander
04/26/2022, 2:28 PMAnna Geller
04/26/2022, 2:36 PM