Hi, Don't know why the data from cursor isn't passed to next save_data step Below are the logs ```2...
k
Hi, Don't know why the data from cursor isn't passed to next save_data step Below are the logs
Copy code
22:56:46.289 | INFO    | prefect.engine - Created flow run 'rose-kestrel' for flow 'Mongo to CSV Pipeline'
22:56:46.345 | INFO    | Flow run 'rose-kestrel' - Created task run 'Query DB-0' for task 'Query DB'
22:56:46.347 | INFO    | Flow run 'rose-kestrel' - Executing 'Query DB-0' immediately...
22:56:46.596 | INFO    | Task run 'Query DB-0' - Finished in state Completed()
22:56:46.621 | INFO    | Flow run 'rose-kestrel' - Finished in state Completed('All states completed.')
j
Copy code
# Save each batch to temporary CSV files
    for i, batch in enumerate(documents):
...
does
documents
have any content?
k
It is supposed to have more the 5K records here documents is generator
j
I'd check if the loop runs. Looking at the logs you've provided, the for-loop does not iterate over whatever enumerate returns.
k
It runs as expected when the decorators are dropped
j
I see! Let me try to reproduce this. It has been a long time since I've run tasks from a loop 😐
@Kayvan Shah I'm not able to reproduce the problem.
Copy code
~/projects/prefect_sandbox via šŸ v3.11.8 (.venv) 
āÆ python flow.py                                                            
10:32:05.704 | INFO    | prefect.engine - Created flow run 'honest-sponge' for flow 'Mongo to CSV Pipeline'
10:32:05.741 | INFO    | Flow run 'honest-sponge' - Created task run 'Save Batch to CSV-0' for task 'Save Batch to CSV'
10:32:05.741 | INFO    | Flow run 'honest-sponge' - Executing 'Save Batch to CSV-0' immediately...
{'_id': 1, 'name': 'John Doe', 'email': '<mailto:john@doe.com|john@doe.com>'}
10:32:05.771 | INFO    | Task run 'Save Batch to CSV-0' - Finished in state Completed()
Batch 0 saved to CSV
10:32:05.785 | INFO    | Flow run 'honest-sponge' - Created task run 'Save Batch to CSV-1' for task 'Save Batch to CSV'
10:32:05.785 | INFO    | Flow run 'honest-sponge' - Executing 'Save Batch to CSV-1' immediately...
{'_id': 2, 'name': 'Jane Doe', 'email': '<mailto:jane@doe.com|jane@doe.com>'}
10:32:05.814 | INFO    | Task run 'Save Batch to CSV-1' - Finished in state Completed()
Batch 1 saved to CSV
10:32:05.827 | INFO    | Flow run 'honest-sponge' - Finished in state Completed('All states completed.')
prefect version
Copy code
~/projects/prefect_sandbox via šŸ v3.11.8 (.venv) 
āÆ prefect --version                             
2.19.5
I'd add some additional logging to make sure mongodb data is retrieved correctly, when running the task loop from the flow.
Also your
sample.py
shows some import issues e.g.
os
,
shutil
modules have not been imported etc.
k
If it is a list it works But when iterate over generator variable for above example it is MongoDBCursor, in this case it doesn't work
j
That is really strange. I think I will need to try to reproduce this with MongoDB's RawBatchCursor then. This may take little bit of time for me.
@Kayvan Shah I can replicate the issue! It is exactly as you mention. When the function creating and returning the
pymongo.cursor.RawBatchCursor
has prefect's
@task
decorator, the returned
cursor.RawBatchCursor
will not be iterated on in the flow's for-loop. Comment out the task decorator, and it will work just fine. Does not work
Copy code
@task(name="Retrieve data from MongoDB")
def get_data_cursor(
Works fine
Copy code
# @task(name="Retrieve data from MongoDB")
def get_data_cursor(
How to reproduce the issue spin up mongodb on local env
Copy code
docker run -d --name mongodb -p 27017:27017 mongo:7.0.11
create dev env
Copy code
python -m venv .venv && activate && pip install prefect pymongo pandas
run broken version
Copy code
PREFECT_LOGGING_LEVEL='DEBUG' python my_mongodb_flow.py
run fixed version
Copy code
# comment out line 12 in my_mongodb_flow.py
PREFECT_LOGGING_LEVEL='DEBUG' python my_mongodb_flow.py
The only thin I can suggest is to create the cursor in the same task you are writing the CSV files. 🤷
Seems to be problem the way "tasks" are executed in prefect. Tasks are run asynchronously, which seem to break the returned cursor. The following MRE will result in
Copy code
...
13:21:47.019 | ERROR   | asyncio - Exception in callback _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at /Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py:381
handle: <Handle _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at /Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py:381>
Traceback (most recent call last):
  File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py", line 383, in _set_state
    _copy_future_state(other, future)
  File "/Users/janispuris/.pyenv/versions/3.11.8/lib/python3.11/asyncio/futures.py", line 359, in _copy_future_state
    dest.set_exception(_convert_future_exc(exception))
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
13:21:56.395 | ERROR   | Flow run 'energetic-swine' - Crash detected! Execution was cancelled by the runtime environment.
But if you comment out line 10, it will work without an issue.
Copy code
āÆ PREFECT_LOGGING_LEVEL='INFO' python my_mongodb_flow.py
Imported modules:
python version: 3.11.8 (main, Mar 15 2024, 15:43:16) [Clang 15.0.0 (clang-1500.3.9.4)]
prefect version: 2.19.5
pymongo version: 4.7.3
13:22:08.625 | INFO    | prefect.engine - Created flow run 'bouncy-hyena' for flow 'Pipeline'
13:22:08.644 | INFO    | Flow run 'bouncy-hyena' - Creating a MongoDB client
13:22:08.648 | INFO    | Flow run 'bouncy-hyena' - Retrieving data from MongoDB
13:22:08.654 | INFO    | Flow run 'bouncy-hyena' - Document: {'_id': 'featureCompatibilityVersion', 'version': '7.0'}
13:22:08.655 | INFO    | Flow run 'bouncy-hyena' - Closing the MongoDB client
13:22:08.674 | INFO    | Flow run 'bouncy-hyena' - Finished in state Completed()
Said that, this is not exclusive to prefect at all. It is asyncio + pymongos cursor that makes this happen.
The upstream problem with asyncio has been fixed in the
4.0.*
version. Latest prefect version seems to install
3.7.1
.
Copy code
pip install prefect
...
Collecting anyio<4.0.0,>=3.7.1 (from prefect)
  Using cached anyio-3.7.1-py3-none-any.whl.metadata (4.7 kB)
...
k
Copy code
@task(name="Query DB - Get Cursor")
def query_mongo(
    uri: str, db_name: str, collection_name: str, 
    filter: Dict[str, Any], projection: Dict[str, Any], 
    batch_size: int
) -> Any:
    connector = MongoDBConnector(uri, db_name)
    collection = connector.get_collection(collection_name)
    cursor = collection.find_raw_batches(filter=filter, projection=projection, batch_size=batch_size)

    for batch in cursor:
        yield bson.decode_all(batch)
This how I finally got it working
šŸ™Œ 1
a
@jpuris I tried your last example and found I had to run
cur.rewind()
prior to
<http://logger.info|logger.info>("Document: %s", cur.next())
.
j
@Arvindra Sehmi If you've iterated over the cursor at some point in time after initializing it, then indeed, you'll need to
.rewind()
it to its initial state.
a
@jpuris I ran your code as is. You can see the
StopIteration
exception. Adding
.rewind()
fixes it.
Copy code
16:40:19.110 | DEBUG   | prefect.profiles - Using profile 'default'
Imported modules:
python version: 3.11.5 | packaged by Anaconda, Inc. | (main, Sep 11 2023, 13:26:23) [MSC v.1916 64 bit (AMD64)]
prefect version: 2.19.4
pymongo version: 4.6.3
16:40:21.123 | DEBUG   | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
16:40:22.136 | INFO    | prefect.engine - Created flow run 'feathered-hound' for flow 'Pipeline'
16:40:22.138 | INFO    | Flow run 'feathered-hound' - View at <http://127.0.0.1:4200/flow-runs/flow-run/a92917b1-dd4c-40c6-8656-8c682cd86cb0>
16:40:22.139 | DEBUG   | Flow run 'feathered-hound' - Starting 'SequentialTaskRunner'; submitted tasks will be run sequentially...
16:40:22.142 | DEBUG   | prefect.task_runner.sequential - Starting task runner...
16:40:23.115 | DEBUG   | prefect.client - Connecting to API at <http://127.0.0.1:4200/api/>
16:40:23.197 | DEBUG   | Flow run 'feathered-hound' - Executing flow 'Pipeline' for flow run 'feathered-hound'...
16:40:23.198 | DEBUG   | Flow run 'feathered-hound' - Beginning execution...
16:40:23.199 | INFO    | Flow run 'feathered-hound' - Creating a MongoDB client
16:40:23.246 | INFO    | Flow run 'feathered-hound' - Retrieving data from MongoDB
16:40:23.337 | INFO    | Flow run 'feathered-hound' - Created task run 'Retrieve data from MongoDB-0' for task 'Retrieve data from MongoDB'
16:40:23.338 | INFO    | Flow run 'feathered-hound' - Executing 'Retrieve data from MongoDB-0' immediately...
16:40:23.433 | DEBUG   | Task run 'Retrieve data from MongoDB-0' - Beginning execution...
16:40:23.487 | INFO    | Task run 'Retrieve data from MongoDB-0' - Finished in state Completed()
16:40:23.489 | DEBUG   | Flow run 'feathered-hound' - Processing the documents
16:40:23.490 | ERROR   | asyncio - Exception in callback _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py:381
handle: <Handle _chain_future.<locals>._set_state(<Future pendi...ask_wakeup()]>, <Future at 0x...StopIteration>) at C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py:381>
Traceback (most recent call last):
  File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py", line 383, in _set_state
    _copy_future_state(other, future)
  File "C:\Users\a.sehmi\anaconda3\Lib\asyncio\futures.py", line 359, in _copy_future_state
    dest.set_exception(_convert_future_exc(exception))
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future