Arthur
06/26/2024, 11:47 AMasyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'accepted: 0...pted: 1 (1)\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'accepted: 0...pted: 1 (1)\n')>
Traceback (most recent call last):
File "/usr/lib/python3.11/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/usr/lib/python3.11/asyncio/subprocess.py", line 72, in pipe_data_received
reader.feed_data(data)
File "/usr/lib/python3.11/asyncio/streams.py", line 498, in feed_data
assert not self._eof, 'feed_data after feed_eof'
^^^^^^^^^^^^^
AssertionError: feed_data after feed_eof
what is going wrong?Marvin
06/26/2024, 11:47 AMasyncio
in Python. An AssertionError: feed_data after feed_eof
is typically raised when feed_data
is called on an asyncio
stream after the feed_eof
method has been called, indicating that the stream was supposed to be closed.
From my search, it appears that this issue has been observed and discussed. There's a relevant GitHub issue that might shed some more light on this: Excessive amount of AssertionError: feed_data before feed_eof in the log.
### Recommendations
1. Review Your Thread Management:
Ensure that your threading logic doesn't attempt to handle data after the stream is closed. This typically happens when the thread lifecycle is not managed correctly.
2. Manage Concurrency Properly:
Use concurrency limits to manage your tasks effectively. For example:
python
with concurrency("database", occupy=1):
connection = psycopg2.connect("<connection_string>")
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchall()
connection.close()
return result
@flow
def my_flow():
queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
for query in queries:
database_query.submit(query)
if __name__ == "__main__":
my_flow()
This ensures that you are limiting the maximum number of concurrent tasks and avoiding overstressing the resources.
3. Iterate Properly Over Inputs in Asynchronous Flows:
Assign the receive_input
iterator to a variable outside the loop to maintain your position across iterations:
python
from prefect import flow, get_client
from prefect.deployments.deployments import run_deployment
from prefect.input.run_input import receive_input
EXIT_SIGNAL = "__EXIT__"
@flow
async def sender():
greeter_flow_run = await run_deployment("greeter/send-receive", timeout=0, as_subflow=False)
client = get_client()
receiver = receive_input(str, with_metadata)
For more details and resources, you could refer to the Prefect documentation on global concurrency limits.
If the problem persists despite these adjustments, you might need to dive deeper into your specific handling of streams and threading with asyncio
.
I hope this helps. If you need further assistance, feel free to ask.Arthur
06/26/2024, 11:49 AMMarvin
06/26/2024, 11:49 AMreceive_input
function in Prefect is designed to iterate over all known inputs to a specific flow run. By default, it provides an iterator that can be used both in synchronous and asynchronous contexts. The key distinction comes from how you handle the flow itself.
### Synchronous Example
You can use receive_input
in synchronous flows without any issue, just by calling next()
explicitly to get the next input:
python
from prefect import flow
from prefect.input.run_input import receive_input
@flow
def my_sync_flow():
receiver = receive_input(str)
while True:
try:
input_value = next(receiver)
# Process the input_value
except StopIteration:
break
if __name__ == "__main__":
my_sync_flow()
### Asynchronous Example
In an asynchronous scenario, you might be using async for
to iterate over the inputs:
python
from prefect import flow, get_client
from prefect.deployments.deployments import run_deployment
from prefect.input.run_input import receive_input
EXIT_SIGNAL = "__EXIT__"
@flow
async def sender():
greeter_flow_run = await run_deployment("greeter/send-receive", timeout=0, as_subflow=False)
client = get_client()
receiver = receive_input(str, with_metadata=True)
async for input_value in receiver:
# Process the input_value
### Documentation
For more information, you can refer to the Prefect documentation on creating interactive workflows here.
### Summary
In brief, receive_input
can be used both synchronously and asynchronously. Ensure that your flow's handling of inputs is consistent with its synchronous or asynchronous nature. If you're doing something asynchronously, remember to use the async
keywords appropriately.
If you have any more questions or need further details, feel free to ask!Arthur
06/26/2024, 11:51 AMdef my_sync_flow():
can i insert any function instead of
input_value = next(receiver)
Marvin
06/26/2024, 11:52 AMinput_value = next(receiver)
as long as it properly processes or interacts with the iterator receiver
. The key is that this section of the code handles the retrieval of the next input value from the iterator.
### Example with Custom Function
Let's say you have a custom function process_input
to handle the input values:
python
def process_input(input_value):
print(f"Processing input: {input_value}")
# Add your custom logic here
You can call this function inside your loop:
python
from prefect import flow
from prefect.input.run_input import receive_input
def process_input(input_value):
print(f"Processing input: {input_value}")
# Add your custom logic here
@flow
def my_sync_flow():
receiver = receive_input(str)
while True:
try:
input_value = next(receiver)
process_input(input_value)
except StopIteration:
break
if __name__ == "__main__":
my_sync_flow()
### Integrating Additional Logic
You can also add more complex logic, error handling, or other function calls as needed. Here's an example that includes error handling and logging:
python
import logging
def process_input(input_value):
<http://logging.info|logging.info>(f"Processing input: {input_value}")
# Add your custom logic here
if input_value == "error": # Placeholder for actual condition
raise ValueError("An error occurred while processing input")
@flow
def my_sync_flow():
receiver = receive_input(str)
while True:
try:
input_value = next(receiver)
process_input(input_value)
except ValueError as e:
logging.error(f"Error processing input: {e}")
except StopIteration:
<http://logging.info|logging.info>("No more inputs to process.")
break
if __name__ == "__main__":
my_sync_flow()
### Summary
Feel free to replace input_value = next(receiver)
with any custom function or logic needed for your specific application. The key is to ensure that the replacement properly retrieves the next input when needed and handles it appropriately.
Let me know if you need further assistance!