Hi All, Would anyone have experience passing `io.S...
# prefect-community
l
Hi All, Would anyone have experience passing
io.StringIO
objects between tasks in prefect 2.0? Here's a minimal example I'm trying to get to work but getting weird results, appreciate any help on this.
Copy code
import io
from prefect import task, flow

@task
def string_to_io():
    return io.StringIO("hello prefect")

@task
def consume(string_io):
    print(type(string_io))
    return string_io.read()

@flow
def test():
    # s = string()
    str_io = string_to_io()
    r = consume(str_io)
    print(r)

test()
when I run it I get
Copy code
9:38:59.511 | INFO    | prefect.engine - Created flow run 'congenial-longhorn' for flow 'test'
19:38:59.708 | INFO    | Flow run 'congenial-longhorn' - Created task run 'string_to_io-78a25967-0' for task 'string_to_io'
19:38:59.709 | INFO    | Flow run 'congenial-longhorn' - Executing 'string_to_io-78a25967-0' immediately...
19:38:59.768 | INFO    | Task run 'string_to_io-78a25967-0' - Finished in state Completed()
19:38:59.789 | INFO    | Flow run 'congenial-longhorn' - Created task run 'consume-884672dc-0' for task 'consume'
19:38:59.790 | INFO    | Flow run 'congenial-longhorn' - Executing 'consume-884672dc-0' immediately...
<class 'list'>
19:38:59.828 | ERROR   | Task run 'consume-884672dc-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1222, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/lukasz/projects/zlotyryjek/crap.py", line 18, in consume
    return string_io.read()
AttributeError: 'list' object has no attribute 'read'
So for some reason
stringIO
is converted to an empty list 🤔
j
This is rather interesting. I’ve not worked with
stringIO
but I’m going to run your test and see if I can make something move…
👍 1
I have NOT been able to figure out why this is happening. I have created a function that returns a
io.StringIO
object. I have checked the type of the object returned and it doesn’t turn it into a list. I am not sure why it is doing that when wrapped in a
@task
decorator. Most of my flows are built returning complex objects and passing them to the next step…
@Zanie I posted this in the wrong thread earlier, apologies for the confusion this may have caused! … I’m not sure you’re the right one to get eyes on this… but I know you would know who if it wasn’t you. Below is the code I have tried when attempting to help figure out why you cannot return an instance of an
io.StringIO
object from a task and use it as input into the next task/flow. I have confirmed that you can (without it being decorated with
@task
) return an instance of
StringIO
in normal Python. I did just figure out that if I decorate the function with
@flow
I get different output than if I decorate them with
@task
… It’s closer to working as a flow (I think) as I get a list with the value of the string within it instead of an empty list. If you change the
consume
and
string_to_io
to
@task
it will give you different output. I’m running locally by executing
python prefect_playground.py
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals
import io
from prefect import task, flow, get_run_logger


@flow
def string_to_io():
    logger = get_run_logger()
    sio = io.StringIO("hello prefect")
    <http://logger.info|logger.info>(f"**** str to io **** {sio.read()}")
    # Below I output what the value would look like if I tried to cast it to a list (empty list is the result)
    <http://logger.info|logger.info>(f"**** **** {list(sio)}")
    sio.seek(0)
    # Other returns I have tried to see if they get changed...
    # return sio.readable() ... True
    # return {"sio": sio} ... {"sio": []}
    return sio


@flow
def consume(string_io):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"******** str_io type: {type(string_io)}, has attr read: {hasattr(string_io, 'read')}, value: {string_io}")
    return string_io.read()


@flow
def test():
    logger = get_run_logger()
    # s = string()
    str_io = string_to_io()
    r = consume(str_io)

    <http://logger.info|logger.info>(f"{r=}")


if __name__ == "__main__":
    test()
z
Hm. Whenever you run into weirdness like this I would recommend using the
prefect.utilities.annotations.quote
utility to get past it quickly.
This is definitely a bug that we should fix though, I presume this is an issue with
visit_collection
or
return_value_to_state
.
l
Thanks for looking into this @Zanie and @Jarvis Stubblefield. Would you mind sharing how would
prefect.utilities.annotations.quote
help in this case?
z
Ah in theory
Copy code
@task
def string_to_io():
    return quote(io.StringIO("hello prefect"))
would prevent us from altering the contents of anything inside the
quote
but I tried it an got the same error
l
OK, got it
j
Looks like the PR above has been merged so it should be in the next release I imagine. @Lukasz Mentel I hope this helps!!
l
Excellent, I'm really impressed how quickly this was addressed, kudos @Jarvis Stubblefield & @Zanie 🙌
🎉 1