https://prefect.io logo
t

Tony Yun

08/02/2023, 5:28 PM
Hi, in prefect2, when I have tasks like this(using
yield
):
Copy code
@task
def fetch_id():
    for i in range(5):
        print("fetch id: ", i)
        yield i

@task
def process(id):
    print(id)

@flow(log_prints=True)
def flow():
    for id in fetch_id():
        process(id)
I noticed the
process
task won’t execute until all ids generated form fetch_id(). I’m wondering what is the yield really doing here. It seems like it still added into memory like a list and looped through. See logs for more details:
Copy code
12:18:31.217 | DEBUG   | prefect.profiles - Using profile 'default'
12:18:31.662 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/tonyyun/.prefect/orion.db
12:18:31.875 | INFO    | prefect.engine - Created flow run 'grumpy-hawk' for flow 'fax-ocr-flow'
12:18:31.876 | DEBUG   | Flow run 'grumpy-hawk' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
12:18:31.877 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
12:18:31.880 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/tonyyun/.prefect/orion.db
12:18:32.025 | DEBUG   | Flow run 'grumpy-hawk' - Executing flow 'fax-ocr-flow' for flow run 'grumpy-hawk'...
12:18:32.026 | DEBUG   | Flow run 'grumpy-hawk' - Beginning execution...
12:18:32.069 | INFO    | Flow run 'grumpy-hawk' - Created task run 'fetch_id-0' for task 'fetch_id'
12:18:32.070 | INFO    | Flow run 'grumpy-hawk' - Executing 'fetch_id-0' immediately...
12:18:32.132 | DEBUG   | Task run 'fetch_id-0' - Beginning execution...
12:18:32.134 | INFO    | Task run 'fetch_id-0' - fetch tax id:  0
12:18:32.135 | INFO    | Task run 'fetch_id-0' - fetch tax id:  1
12:18:32.135 | INFO    | Task run 'fetch_id-0' - fetch tax id:  2
12:18:32.136 | INFO    | Task run 'fetch_id-0' - fetch tax id:  3
12:18:32.137 | INFO    | Task run 'fetch_id-0' - fetch tax id:  4
12:18:32.180 | INFO    | Task run 'fetch_id-0' - Finished in state Completed()
12:18:32.212 | INFO    | Flow run 'grumpy-hawk' - Created task run 'download_pdf-0' for task 'download_pdf'
12:18:32.213 | INFO    | Flow run 'grumpy-hawk' - Executing 'download_pdf-0' immediately...
12:18:32.272 | DEBUG   | Task run 'download_pdf-0' - Beginning execution...
12:18:32.273 | INFO    | Task run 'download_pdf-0' - Download fax:  0
12:18:32.309 | INFO    | Task run 'download_pdf-0' - Finished in state Completed()
12:18:32.311 | INFO    | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.339 | INFO    | Flow run 'grumpy-hawk' - Created task run 'download_pdf-1' for task 'download_pdf'
12:18:32.340 | INFO    | Flow run 'grumpy-hawk' - Executing 'download_pdf-1' immediately...
12:18:32.397 | DEBUG   | Task run 'download_pdf-1' - Beginning execution...
12:18:32.398 | INFO    | Task run 'download_pdf-1' - Download fax:  1
12:18:32.437 | INFO    | Task run 'download_pdf-1' - Finished in state Completed()
12:18:32.439 | INFO    | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.468 | INFO    | Flow run 'grumpy-hawk' - Created task run 'download_pdf-2' for task 'download_pdf'
12:18:32.469 | INFO    | Flow run 'grumpy-hawk' - Executing 'download_pdf-2' immediately...
12:18:32.528 | DEBUG   | Task run 'download_pdf-2' - Beginning execution...
12:18:32.530 | INFO    | Task run 'download_pdf-2' - Download fax:  2
12:18:32.569 | INFO    | Task run 'download_pdf-2' - Finished in state Completed()
12:18:32.571 | INFO    | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.603 | INFO    | Flow run 'grumpy-hawk' - Created task run 'download_pdf-3' for task 'download_pdf'
12:18:32.605 | INFO    | Flow run 'grumpy-hawk' - Executing 'download_pdf-3' immediately...
12:18:32.672 | DEBUG   | Task run 'download_pdf-3' - Beginning execution...
12:18:32.673 | INFO    | Task run 'download_pdf-3' - Download fax:  3
12:18:32.719 | INFO    | Task run 'download_pdf-3' - Finished in state Completed()
12:18:32.721 | INFO    | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.757 | INFO    | Flow run 'grumpy-hawk' - Created task run 'download_pdf-4' for task 'download_pdf'
12:18:32.757 | INFO    | Flow run 'grumpy-hawk' - Executing 'download_pdf-4' immediately...
12:18:32.829 | DEBUG   | Task run 'download_pdf-4' - Beginning execution...
12:18:32.831 | INFO    | Task run 'download_pdf-4' - Download fax:  4
12:18:32.870 | INFO    | Task run 'download_pdf-4' - Finished in state Completed()
12:18:32.872 | INFO    | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.916 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
12:18:32.917 | INFO    | Flow run 'grumpy-hawk' - Finished in state Completed('All states completed.')
n

Nate

08/02/2023, 6:41 PM
tasks do not work as generators, they will return the entire iterable that results from exhausting the generator. instead you can
.map
tasks over a generator like this
Copy code
In [23]: @task
    ...: def add_42(v: int) -> int:
    ...:     return v + 42
    ...:

In [24]: @flow
    ...: def foo():
    ...:     futures = add_42.map((i for i in range(3)))
    ...:     print([f.result() for f in futures])
some context on the intended behaviour here
t

Tony Yun

08/02/2023, 6:44 PM
cool. thanks for sharing the tests. So we still have to manage it using a list that might be very long. Not a way we can save the memory and process streamingly?
n

Nate

08/02/2023, 6:49 PM
the example im showing above is utilizing a generator expression to avoid loading the whole range into memory
if you dont want a task for each item yielded from the iterable, you could change the task to expect a chunk of the iterable that's acceptable to load into memory and then map over yielded chunks
t

Tony Yun

08/02/2023, 6:57 PM
that example is in a perfect world. If I change it slightly to:
Copy code
def _yield():
    for i in range(3):
        print("yield: ", i)
        yield i


@flow(log_prints=True)
def my_flow():
    my_task.map(_yield())
it’s again yield all iterables before process:
Copy code
14:56:21.540 | INFO    | Flow run 'sociable-raptor' - yield:  0
14:56:21.542 | INFO    | Flow run 'sociable-raptor' - yield:  1
14:56:21.545 | INFO    | Flow run 'sociable-raptor' - yield:  2

14:56:21.765 | INFO    | Flow run 'sociable-raptor' - Submitted task run 'my_task-2' for execution.
14:56:21.994 | DEBUG   | Task run 'my_task-1' - Beginning execution...
14:56:21.998 | INFO    | Task run 'my_task-1' - 1
14:56:22.102 | DEBUG   | Task run 'my_task-2' - Beginning execution...
14:56:22.106 | INFO    | Task run 'my_task-2' - 2
n

Nate

08/02/2023, 7:08 PM
ahh yeah that's because the
submit
method (used by
map
under the hood) just returns right away with a future, if instead you do
Copy code
def _yield():
    for i in range(3):
        print("yield: ", i)
        yield i


@task
def my_task(x):
    print(x)

@flow(log_prints=True)
def my_flow():
    for i in _yield():
        my_task(i)
then you get
Copy code
14:05:04.807 | INFO    | prefect.engine - Created flow run 'fancy-harrier' for flow 'my-flow'
14:05:05.355 | INFO    | Flow run 'fancy-harrier' - yield:  0
14:05:05.570 | INFO    | Flow run 'fancy-harrier' - Created task run 'my_task-0' for task 'my_task'
14:05:05.572 | INFO    | Flow run 'fancy-harrier' - Executing 'my_task-0' immediately...
14:05:06.000 | INFO    | Task run 'my_task-0' - 0
14:05:06.245 | INFO    | Task run 'my_task-0' - Finished in state Completed()
14:05:06.253 | INFO    | Flow run 'fancy-harrier' - yield:  1
14:05:07.797 | INFO    | Flow run 'fancy-harrier' - Created task run 'my_task-1' for task 'my_task'
14:05:07.798 | INFO    | Flow run 'fancy-harrier' - Executing 'my_task-1' immediately...
14:05:10.348 | INFO    | Task run 'my_task-1' - 1
14:05:10.852 | INFO    | Task run 'my_task-1' - Finished in state Completed()
14:05:10.866 | INFO    | Flow run 'fancy-harrier' - yield:  2
14:05:11.019 | INFO    | Flow run 'fancy-harrier' - Created task run 'my_task-2' for task 'my_task'
14:05:11.021 | INFO    | Flow run 'fancy-harrier' - Executing 'my_task-2' immediately...
14:05:11.453 | INFO    | Task run 'my_task-2' - 2
14:05:11.608 | INFO    | Task run 'my_task-2' - Finished in state Completed()
14:05:11.737 | INFO    | Flow run 'fancy-harrier' - Finished in state Completed('All states completed.')
t

Tony Yun

08/02/2023, 7:10 PM
hold on. this goes back to my original example I think. Let me run yours once.
n

Nate

08/02/2023, 7:11 PM
the difference is that the thing thats yielding in my example is not a task
the task returns the full iterable, whatever can be pulled out of the generator
t

Tony Yun

08/02/2023, 7:12 PM
oh I see. If I remove the @task it runs iterabally
thank you, @Nate!
n

Nate

08/02/2023, 7:13 PM
sure thing!