Tony Yun
08/02/2023, 5:28 PMyield
):
@task
def fetch_id():
for i in range(5):
print("fetch id: ", i)
yield i
@task
def process(id):
print(id)
@flow(log_prints=True)
def flow():
for id in fetch_id():
process(id)
I noticed the process
task won’t execute until all ids generated form fetch_id(). I’m wondering what is the yield really doing here. It seems like it still added into memory like a list and looped through. See logs for more details:12:18:31.217 | DEBUG | prefect.profiles - Using profile 'default'
12:18:31.662 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/tonyyun/.prefect/orion.db
12:18:31.875 | INFO | prefect.engine - Created flow run 'grumpy-hawk' for flow 'fax-ocr-flow'
12:18:31.876 | DEBUG | Flow run 'grumpy-hawk' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
12:18:31.877 | DEBUG | prefect.task_runner.concurrent - Starting task runner...
12:18:31.880 | DEBUG | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/tonyyun/.prefect/orion.db
12:18:32.025 | DEBUG | Flow run 'grumpy-hawk' - Executing flow 'fax-ocr-flow' for flow run 'grumpy-hawk'...
12:18:32.026 | DEBUG | Flow run 'grumpy-hawk' - Beginning execution...
12:18:32.069 | INFO | Flow run 'grumpy-hawk' - Created task run 'fetch_id-0' for task 'fetch_id'
12:18:32.070 | INFO | Flow run 'grumpy-hawk' - Executing 'fetch_id-0' immediately...
12:18:32.132 | DEBUG | Task run 'fetch_id-0' - Beginning execution...
12:18:32.134 | INFO | Task run 'fetch_id-0' - fetch tax id: 0
12:18:32.135 | INFO | Task run 'fetch_id-0' - fetch tax id: 1
12:18:32.135 | INFO | Task run 'fetch_id-0' - fetch tax id: 2
12:18:32.136 | INFO | Task run 'fetch_id-0' - fetch tax id: 3
12:18:32.137 | INFO | Task run 'fetch_id-0' - fetch tax id: 4
12:18:32.180 | INFO | Task run 'fetch_id-0' - Finished in state Completed()
12:18:32.212 | INFO | Flow run 'grumpy-hawk' - Created task run 'download_pdf-0' for task 'download_pdf'
12:18:32.213 | INFO | Flow run 'grumpy-hawk' - Executing 'download_pdf-0' immediately...
12:18:32.272 | DEBUG | Task run 'download_pdf-0' - Beginning execution...
12:18:32.273 | INFO | Task run 'download_pdf-0' - Download fax: 0
12:18:32.309 | INFO | Task run 'download_pdf-0' - Finished in state Completed()
12:18:32.311 | INFO | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.339 | INFO | Flow run 'grumpy-hawk' - Created task run 'download_pdf-1' for task 'download_pdf'
12:18:32.340 | INFO | Flow run 'grumpy-hawk' - Executing 'download_pdf-1' immediately...
12:18:32.397 | DEBUG | Task run 'download_pdf-1' - Beginning execution...
12:18:32.398 | INFO | Task run 'download_pdf-1' - Download fax: 1
12:18:32.437 | INFO | Task run 'download_pdf-1' - Finished in state Completed()
12:18:32.439 | INFO | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.468 | INFO | Flow run 'grumpy-hawk' - Created task run 'download_pdf-2' for task 'download_pdf'
12:18:32.469 | INFO | Flow run 'grumpy-hawk' - Executing 'download_pdf-2' immediately...
12:18:32.528 | DEBUG | Task run 'download_pdf-2' - Beginning execution...
12:18:32.530 | INFO | Task run 'download_pdf-2' - Download fax: 2
12:18:32.569 | INFO | Task run 'download_pdf-2' - Finished in state Completed()
12:18:32.571 | INFO | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.603 | INFO | Flow run 'grumpy-hawk' - Created task run 'download_pdf-3' for task 'download_pdf'
12:18:32.605 | INFO | Flow run 'grumpy-hawk' - Executing 'download_pdf-3' immediately...
12:18:32.672 | DEBUG | Task run 'download_pdf-3' - Beginning execution...
12:18:32.673 | INFO | Task run 'download_pdf-3' - Download fax: 3
12:18:32.719 | INFO | Task run 'download_pdf-3' - Finished in state Completed()
12:18:32.721 | INFO | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.757 | INFO | Flow run 'grumpy-hawk' - Created task run 'download_pdf-4' for task 'download_pdf'
12:18:32.757 | INFO | Flow run 'grumpy-hawk' - Executing 'download_pdf-4' immediately...
12:18:32.829 | DEBUG | Task run 'download_pdf-4' - Beginning execution...
12:18:32.831 | INFO | Task run 'download_pdf-4' - Download fax: 4
12:18:32.870 | INFO | Task run 'download_pdf-4' - Finished in state Completed()
12:18:32.872 | INFO | Flow run 'grumpy-hawk' - Using sort strategy: categories=[PriorAuth(matching_word='Prior Authorization', s3_folder='/prior_auth/'), Refill(matching_word='Rx Refill', s3_folder='/refill/')]
12:18:32.916 | DEBUG | prefect.task_runner.concurrent - Shutting down task runner...
12:18:32.917 | INFO | Flow run 'grumpy-hawk' - Finished in state Completed('All states completed.')
Nate
08/02/2023, 6:41 PM.map
tasks over a generator like this
In [23]: @task
...: def add_42(v: int) -> int:
...: return v + 42
...:
In [24]: @flow
...: def foo():
...: futures = add_42.map((i for i in range(3)))
...: print([f.result() for f in futures])
Tony Yun
08/02/2023, 6:44 PMNate
08/02/2023, 6:49 PMTony Yun
08/02/2023, 6:57 PMdef _yield():
for i in range(3):
print("yield: ", i)
yield i
@flow(log_prints=True)
def my_flow():
my_task.map(_yield())
it’s again yield all iterables before process:
14:56:21.540 | INFO | Flow run 'sociable-raptor' - yield: 0
14:56:21.542 | INFO | Flow run 'sociable-raptor' - yield: 1
14:56:21.545 | INFO | Flow run 'sociable-raptor' - yield: 2
14:56:21.765 | INFO | Flow run 'sociable-raptor' - Submitted task run 'my_task-2' for execution.
14:56:21.994 | DEBUG | Task run 'my_task-1' - Beginning execution...
14:56:21.998 | INFO | Task run 'my_task-1' - 1
14:56:22.102 | DEBUG | Task run 'my_task-2' - Beginning execution...
14:56:22.106 | INFO | Task run 'my_task-2' - 2
Nate
08/02/2023, 7:08 PMsubmit
method (used by map
under the hood) just returns right away with a future, if instead you do
def _yield():
for i in range(3):
print("yield: ", i)
yield i
@task
def my_task(x):
print(x)
@flow(log_prints=True)
def my_flow():
for i in _yield():
my_task(i)
then you get
14:05:04.807 | INFO | prefect.engine - Created flow run 'fancy-harrier' for flow 'my-flow'
14:05:05.355 | INFO | Flow run 'fancy-harrier' - yield: 0
14:05:05.570 | INFO | Flow run 'fancy-harrier' - Created task run 'my_task-0' for task 'my_task'
14:05:05.572 | INFO | Flow run 'fancy-harrier' - Executing 'my_task-0' immediately...
14:05:06.000 | INFO | Task run 'my_task-0' - 0
14:05:06.245 | INFO | Task run 'my_task-0' - Finished in state Completed()
14:05:06.253 | INFO | Flow run 'fancy-harrier' - yield: 1
14:05:07.797 | INFO | Flow run 'fancy-harrier' - Created task run 'my_task-1' for task 'my_task'
14:05:07.798 | INFO | Flow run 'fancy-harrier' - Executing 'my_task-1' immediately...
14:05:10.348 | INFO | Task run 'my_task-1' - 1
14:05:10.852 | INFO | Task run 'my_task-1' - Finished in state Completed()
14:05:10.866 | INFO | Flow run 'fancy-harrier' - yield: 2
14:05:11.019 | INFO | Flow run 'fancy-harrier' - Created task run 'my_task-2' for task 'my_task'
14:05:11.021 | INFO | Flow run 'fancy-harrier' - Executing 'my_task-2' immediately...
14:05:11.453 | INFO | Task run 'my_task-2' - 2
14:05:11.608 | INFO | Task run 'my_task-2' - Finished in state Completed()
14:05:11.737 | INFO | Flow run 'fancy-harrier' - Finished in state Completed('All states completed.')
Tony Yun
08/02/2023, 7:10 PMNate
08/02/2023, 7:11 PMTony Yun
08/02/2023, 7:12 PMNate
08/02/2023, 7:13 PM