Kayvan Shah
06/13/2024, 10:59 PM22:56:46.289 | INFO | prefect.engine - Created flow run 'rose-kestrel' for flow 'Mongo to CSV Pipeline'
22:56:46.345 | INFO | Flow run 'rose-kestrel' - Created task run 'Query DB-0' for task 'Query DB'
22:56:46.347 | INFO | Flow run 'rose-kestrel' - Executing 'Query DB-0' immediately...
22:56:46.596 | INFO | Task run 'Query DB-0' - Finished in state Completed()
22:56:46.621 | INFO | Flow run 'rose-kestrel' - Finished in state Completed('All states completed.')
jpuris
06/14/2024, 8:17 AM# Save each batch to temporary CSV files
for i, batch in enumerate(documents):
...
does documents
have any content?Kayvan Shah
06/14/2024, 8:19 AMjpuris
06/14/2024, 8:21 AMKayvan Shah
06/14/2024, 8:23 AMjpuris
06/14/2024, 8:23 AMjpuris
06/14/2024, 8:38 AM~/projects/prefect_sandbox via š v3.11.8 (.venv)
⯠python flow.py
10:32:05.704 | INFO | prefect.engine - Created flow run 'honest-sponge' for flow 'Mongo to CSV Pipeline'
10:32:05.741 | INFO | Flow run 'honest-sponge' - Created task run 'Save Batch to CSV-0' for task 'Save Batch to CSV'
10:32:05.741 | INFO | Flow run 'honest-sponge' - Executing 'Save Batch to CSV-0' immediately...
{'_id': 1, 'name': 'John Doe', 'email': '<mailto:john@doe.com|john@doe.com>'}
10:32:05.771 | INFO | Task run 'Save Batch to CSV-0' - Finished in state Completed()
Batch 0 saved to CSV
10:32:05.785 | INFO | Flow run 'honest-sponge' - Created task run 'Save Batch to CSV-1' for task 'Save Batch to CSV'
10:32:05.785 | INFO | Flow run 'honest-sponge' - Executing 'Save Batch to CSV-1' immediately...
{'_id': 2, 'name': 'Jane Doe', 'email': '<mailto:jane@doe.com|jane@doe.com>'}
10:32:05.814 | INFO | Task run 'Save Batch to CSV-1' - Finished in state Completed()
Batch 1 saved to CSV
10:32:05.827 | INFO | Flow run 'honest-sponge' - Finished in state Completed('All states completed.')
prefect version
~/projects/prefect_sandbox via š v3.11.8 (.venv)
⯠prefect --version
2.19.5
I'd add some additional logging to make sure mongodb data is retrieved correctly, when running the task loop from the flow.jpuris
06/14/2024, 8:40 AMsample.py
shows some import issues e.g. os
, shutil
modules have not been imported etc.Kayvan Shah
06/14/2024, 8:56 AMjpuris
06/14/2024, 9:48 AMjpuris
06/14/2024, 10:47 AMpymongo.cursor.RawBatchCursor
has prefect's @task
decorator, the returned cursor.RawBatchCursor
will not be iterated on in the flow's for-loop.
Comment out the task decorator, and it will work just fine.
Does not work
@task(name="Retrieve data from MongoDB")
def get_data_cursor(
Works fine
# @task(name="Retrieve data from MongoDB")
def get_data_cursor(
How to reproduce the issue
spin up mongodb on local env
docker run -d --name mongodb -p 27017:27017 mongo:7.0.11
create dev env
python -m venv .venv && activate && pip install prefect pymongo pandas
run broken version
PREFECT_LOGGING_LEVEL='DEBUG' python my_mongodb_flow.py
run fixed version
# comment out line 12 in my_mongodb_flow.py
PREFECT_LOGGING_LEVEL='DEBUG' python my_mongodb_flow.py
jpuris
06/14/2024, 11:01 AMjpuris
06/14/2024, 11:24 AM...
13:21:47.019 | ERROR | asyncio - Exception in callback _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at /Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py:381
handle: <Handle _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at /Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py:381>
Traceback (most recent call last):
File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py", line 383, in _set_state
_copy_future_state(other, future)
File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py", line 359, in _copy_future_state
dest.set_exception(_convert_future_exc(exception))
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
13:21:56.395 | ERROR | Flow run 'energetic-swine' - Crash detected! Execution was cancelled by the runtime environment.
But if you comment out line 10, it will work without an issue.
⯠PREFECT_LOGGING_LEVEL='INFO' python my_mongodb_flow.py
Imported modules:
python version: 3.11.8 (main, Mar 15 2024, 15:43:16) [Clang 15.0.0 (clang-1500.3.9.4)]
prefect version: 2.19.5
pymongo version: 4.7.3
13:22:08.625 | INFO | prefect.engine - Created flow run 'bouncy-hyena' for flow 'Pipeline'
13:22:08.644 | INFO | Flow run 'bouncy-hyena' - Creating a MongoDB client
13:22:08.648 | INFO | Flow run 'bouncy-hyena' - Retrieving data from MongoDB
13:22:08.654 | INFO | Flow run 'bouncy-hyena' - Document: {'_id': 'featureCompatibilityVersion', 'version': '7.0'}
13:22:08.655 | INFO | Flow run 'bouncy-hyena' - Closing the MongoDB client
13:22:08.674 | INFO | Flow run 'bouncy-hyena' - Finished in state Completed()
Said that, this is not exclusive to prefect at all. It is asyncio + pymongos cursor that makes this happen.jpuris
06/14/2024, 11:29 AMjpuris
06/14/2024, 11:38 AM4.0.*
version.
Latest prefect version seems to install 3.7.1
.
pip install prefect
...
Collecting anyio<4.0.0,>=3.7.1 (from prefect)
Using cached anyio-3.7.1-py3-none-any.whl.metadata (4.7 kB)
...
Kayvan Shah
06/14/2024, 7:35 PM@task(name="Query DB - Get Cursor")
def query_mongo(
uri: str, db_name: str, collection_name: str,
filter: Dict[str, Any], projection: Dict[str, Any],
batch_size: int
) -> Any:
connector = MongoDBConnector(uri, db_name)
collection = connector.get_collection(collection_name)
cursor = collection.find_raw_batches(filter=filter, projection=projection, batch_size=batch_size)
for batch in cursor:
yield bson.decode_all(batch)
This how I finally got it workingArvindra Sehmi
06/19/2024, 2:02 PMcur.rewind()
prior to <http://logger.info|logger.info>("Document: %s", cur.next())
.jpuris
06/19/2024, 2:55 PM.rewind()
it to its initial state.Arvindra Sehmi
06/19/2024, 3:41 PMStopIteration
exception. Adding .rewind()
fixes it.
16:40:19.110 | DEBUG | prefect.profiles - Using profile 'default'
Imported modules:
python version: 3.11.5 | packaged by Anaconda, Inc. | (main, Sep 11 2023, 13:26:23) [MSC v.1916 64 bit (AMD64)]
prefect version: 2.19.4
pymongo version: 4.6.3
16:40:21.123 | DEBUG | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
16:40:22.136 | INFO | prefect.engine - Created flow run 'feathered-hound' for flow 'Pipeline'
16:40:22.138 | INFO | Flow run 'feathered-hound' - View at <http://127.0.0.1:4200/flow-runs/flow-run/a92917b1-dd4c-40c6-8656-8c682cd86cb0>
16:40:22.139 | DEBUG | Flow run 'feathered-hound' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially...
16:40:22.142 | DEBUG | prefect.task_runner.sequential - Starting task runner...
16:40:23.115 | DEBUG | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
16:40:23.197 | DEBUG | Flow run 'feathered-hound' - Executing flow 'Pipeline' for flow run 'feathered-hound'...
16:40:23.198 | DEBUG | Flow run 'feathered-hound' - Beginning execution...
16:40:23.199 | INFO | Flow run 'feathered-hound' - Creating a MongoDB client
16:40:23.246 | INFO | Flow run 'feathered-hound' - Retrieving data from MongoDB
16:40:23.337 | INFO | Flow run 'feathered-hound' - Created task run 'Retrieve data from MongoDB-0' for task 'Retrieve data from MongoDB'
16:40:23.338 | INFO | Flow run 'feathered-hound' - Executing 'Retrieve data from MongoDB-0' immediately...
16:40:23.433 | DEBUG | Task run 'Retrieve data from MongoDB-0' - Beginning execution...
16:40:23.487 | INFO | Task run 'Retrieve data from MongoDB-0' - Finished in state Completed()
16:40:23.489 | DEBUG | Flow run 'feathered-hound' - Processing the documents
16:40:23.490 | ERROR | asyncio - Exception in callback _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py:381
handle: <Handle _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py:381>
Traceback (most recent call last):
File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py", line 383, in _set_state
_copy_future_state(other, future)
File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py", line 359, in _copy_future_state
dest.set_exception(_convert_future_exc(exception))
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future