<@ULVA73B9P> I'm seeing a strange issue when using...
# ask-marvin
a
@Marvin I'm seeing a strange issue when using threading with python. A flow runs successfully for a couple of minutes and then gets errors like.
Copy code
asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'accepted: 0...pted: 1 (1)\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'accepted: 0...pted: 1 (1)\n')>
Traceback (most recent call last):
  File "/usr/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "/usr/lib/python3.11/asyncio/subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "/usr/lib/python3.11/asyncio/streams.py", line 498, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
           ^^^^^^^^^^^^^
AssertionError: feed_data after feed_eof
what is going wrong?
m
The error you're encountering appears to be related to threading with
asyncio
in Python. An
AssertionError: feed_data after feed_eof
is typically raised when
feed_data
is called on an
asyncio
stream after the
feed_eof
method has been called, indicating that the stream was supposed to be closed. From my search, it appears that this issue has been observed and discussed. There's a relevant GitHub issue that might shed some more light on this: Excessive amount of AssertionError: feed_data before feed_eof in the log. ### Recommendations 1. Review Your Thread Management: Ensure that your threading logic doesn't attempt to handle data after the stream is closed. This typically happens when the thread lifecycle is not managed correctly. 2. Manage Concurrency Properly: Use concurrency limits to manage your tasks effectively. For example:
Copy code
python
   with concurrency("database", occupy=1):
       connection = psycopg2.connect("<connection_string>")
       cursor = connection.cursor()
       cursor.execute(query)
       result = cursor.fetchall()
       connection.close()
   return result
   
   @flow
   def my_flow():
       queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
       for query in queries:
           database_query.submit(query)
   
   if __name__ == "__main__":
       my_flow()
This ensures that you are limiting the maximum number of concurrent tasks and avoiding overstressing the resources. 3. Iterate Properly Over Inputs in Asynchronous Flows: Assign the
receive_input
iterator to a variable outside the loop to maintain your position across iterations:
Copy code
python
   from prefect import flow, get_client
   from prefect.deployments.deployments import run_deployment
   from prefect.input.run_input import receive_input

   EXIT_SIGNAL = "__EXIT__"

   @flow
   async def sender():
       greeter_flow_run = await run_deployment("greeter/send-receive", timeout=0, as_subflow=False)
       client = get_client()
       receiver = receive_input(str, with_metadata)
For more details and resources, you could refer to the Prefect documentation on global concurrency limits. If the problem persists despite these adjustments, you might need to dive deeper into your specific handling of streams and threading with
asyncio
. I hope this helps. If you need further assistance, feel free to ask.
a
@Marvin is receiver only for use with async?
m
The
receive_input
function in Prefect is designed to iterate over all known inputs to a specific flow run. By default, it provides an iterator that can be used both in synchronous and asynchronous contexts. The key distinction comes from how you handle the flow itself. ### Synchronous Example You can use
receive_input
in synchronous flows without any issue, just by calling
next()
explicitly to get the next input:
Copy code
python
from prefect import flow
from prefect.input.run_input import receive_input

@flow
def my_sync_flow():
    receiver = receive_input(str)
    while True:
        try:
            input_value = next(receiver)
            # Process the input_value
        except StopIteration:
            break

if __name__ == "__main__":
    my_sync_flow()
### Asynchronous Example In an asynchronous scenario, you might be using
async for
to iterate over the inputs:
Copy code
python
from prefect import flow, get_client
from prefect.deployments.deployments import run_deployment
from prefect.input.run_input import receive_input

EXIT_SIGNAL = "__EXIT__"

@flow
async def sender():
    greeter_flow_run = await run_deployment("greeter/send-receive", timeout=0, as_subflow=False)
    client = get_client()
    receiver = receive_input(str, with_metadata=True)
    async for input_value in receiver:
        # Process the input_value
### Documentation For more information, you can refer to the Prefect documentation on creating interactive workflows here. ### Summary In brief,
receive_input
can be used both synchronously and asynchronously. Ensure that your flow's handling of inputs is consistent with its synchronous or asynchronous nature. If you're doing something asynchronously, remember to use the
async
keywords appropriately. If you have any more questions or need further details, feel free to ask!
a
@Marvin in
def my_sync_flow():
can i insert any function instead of
Copy code
input_value = next(receiver)
m
Yes, you can insert any function call or logic in place of
input_value = next(receiver)
as long as it properly processes or interacts with the iterator
receiver
. The key is that this section of the code handles the retrieval of the next input value from the iterator. ### Example with Custom Function Let's say you have a custom function
process_input
to handle the input values:
Copy code
python
def process_input(input_value):
    print(f"Processing input: {input_value}")
    # Add your custom logic here
You can call this function inside your loop:
Copy code
python
from prefect import flow
from prefect.input.run_input import receive_input

def process_input(input_value):
    print(f"Processing input: {input_value}")
    # Add your custom logic here

@flow
def my_sync_flow():
    receiver = receive_input(str)
    while True:
        try:
            input_value = next(receiver)
            process_input(input_value)
        except StopIteration:
            break

if __name__ == "__main__":
    my_sync_flow()
### Integrating Additional Logic You can also add more complex logic, error handling, or other function calls as needed. Here's an example that includes error handling and logging:
Copy code
python
import logging

def process_input(input_value):
    <http://logging.info|logging.info>(f"Processing input: {input_value}")
    # Add your custom logic here
    if input_value == "error":  # Placeholder for actual condition
        raise ValueError("An error occurred while processing input")

@flow
def my_sync_flow():
    receiver = receive_input(str)
    while True:
        try:
            input_value = next(receiver)
            process_input(input_value)
        except ValueError as e:
            logging.error(f"Error processing input: {e}")
        except StopIteration:
            <http://logging.info|logging.info>("No more inputs to process.")
            break

if __name__ == "__main__":
    my_sync_flow()
### Summary Feel free to replace
input_value = next(receiver)
with any custom function or logic needed for your specific application. The key is to ensure that the replacement properly retrieves the next input when needed and handles it appropriately. Let me know if you need further assistance!