Lukasz Mentel
11/07/2022, 6:44 PMio.StringIO
objects between tasks in prefect 2.0? Here's a minimal example I'm trying to get to work but getting weird results, appreciate any help on this.
import io
from prefect import task, flow
@task
def string_to_io():
return io.StringIO("hello prefect")
@task
def consume(string_io):
print(type(string_io))
return string_io.read()
@flow
def test():
# s = string()
str_io = string_to_io()
r = consume(str_io)
print(r)
test()
when I run it I get
9:38:59.511 | INFO | prefect.engine - Created flow run 'congenial-longhorn' for flow 'test'
19:38:59.708 | INFO | Flow run 'congenial-longhorn' - Created task run 'string_to_io-78a25967-0' for task 'string_to_io'
19:38:59.709 | INFO | Flow run 'congenial-longhorn' - Executing 'string_to_io-78a25967-0' immediately...
19:38:59.768 | INFO | Task run 'string_to_io-78a25967-0' - Finished in state Completed()
19:38:59.789 | INFO | Flow run 'congenial-longhorn' - Created task run 'consume-884672dc-0' for task 'consume'
19:38:59.790 | INFO | Flow run 'congenial-longhorn' - Executing 'consume-884672dc-0' immediately...
<class 'list'>
19:38:59.828 | ERROR | Task run 'consume-884672dc-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/engine.py", line 1222, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/lukasz/.cache/pypoetry/virtualenvs/zlotyryjek-eFYuV0Po-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/lukasz/projects/zlotyryjek/crap.py", line 18, in consume
return string_io.read()
AttributeError: 'list' object has no attribute 'read'
So for some reason stringIO
is converted to an empty list 🤔Jarvis Stubblefield
11/07/2022, 7:33 PMstringIO
but I’m going to run your test and see if I can make something move…Jarvis Stubblefield
11/08/2022, 2:34 AMio.StringIO
object. I have checked the type of the object returned and it doesn’t turn it into a list. I am not sure why it is doing that when wrapped in a @task
decorator. Most of my flows are built returning complex objects and passing them to the next step…Jarvis Stubblefield
11/09/2022, 9:10 AMio.StringIO
object from a task and use it as input into the next task/flow. I have confirmed that you can (without it being decorated with @task
) return an instance of StringIO
in normal Python.
I did just figure out that if I decorate the function with @flow
I get different output than if I decorate them with @task
… It’s closer to working as a flow (I think) as I get a list with the value of the string within it instead of an empty list. If you change the consume
and string_to_io
to @task
it will give you different output.
I’m running locally by executing python prefect_playground.py
# -*- coding: utf-8
from __future__ import unicode_literals
import io
from prefect import task, flow, get_run_logger
@flow
def string_to_io():
logger = get_run_logger()
sio = io.StringIO("hello prefect")
<http://logger.info|logger.info>(f"**** str to io **** {sio.read()}")
# Below I output what the value would look like if I tried to cast it to a list (empty list is the result)
<http://logger.info|logger.info>(f"**** **** {list(sio)}")
sio.seek(0)
# Other returns I have tried to see if they get changed...
# return sio.readable() ... True
# return {"sio": sio} ... {"sio": []}
return sio
@flow
def consume(string_io):
logger = get_run_logger()
<http://logger.info|logger.info>(f"******** str_io type: {type(string_io)}, has attr read: {hasattr(string_io, 'read')}, value: {string_io}")
return string_io.read()
@flow
def test():
logger = get_run_logger()
# s = string()
str_io = string_to_io()
r = consume(str_io)
<http://logger.info|logger.info>(f"{r=}")
if __name__ == "__main__":
test()
Zanie
prefect.utilities.annotations.quote
utility to get past it quickly.Zanie
visit_collection
or return_value_to_state
.Lukasz Mentel
11/09/2022, 6:29 PMprefect.utilities.annotations.quote
help in this case?Zanie
@task
def string_to_io():
return quote(io.StringIO("hello prefect"))
would prevent us from altering the contents of anything inside the quote
but I tried it an got the same errorLukasz Mentel
11/09/2022, 6:36 PMZanie
Jarvis Stubblefield
11/09/2022, 10:21 PMLukasz Mentel
11/10/2022, 1:58 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by