https://prefect.io logo
Title
j

Jean-Michel Provencher

01/16/2023, 6:54 PM
Hello, in the documentation, it explains that Task cannot be run inside other Task and one of the workaround is calling the task using .fn(). However, when using .fn() with a task that returns a result, I get a return value that is a coroutine instead of the actual value I'm looking for. Any way to get the resulting object instead of a co-routine using .fn() ? Otherwise, what are other alternatives?
a

Anna Geller

01/16/2023, 6:57 PM
when you use .fn(), you don't use Prefect features, you are just calling the python function:
from prefect import task, flow


@task
def special_data():
    return 42


@task
def extract_data():
    return special_data.fn()


@flow(log_prints=True)
def get_data():
    x = extract_data()
    print(x)


if __name__ == "__main__":
    get_data()
j

Jean-Michel Provencher

01/16/2023, 6:58 PM
it's weird, because calling
read_secret.fn(
            secret_name=A_SECRET_NAME),
          aws_credentials=aws_credentials,
            )
return a coroutine
that might be because it's an async function.
n

Nate

01/16/2023, 7:50 PM
that makes sense, you'll just need to await your
read_secret.fn
call
In [2]: async def read_secret():
   ...:     return 42
   ...:

In [3]: read_secret()
Out[3]: <coroutine object read_secret at 0x7fa7004f5170>

In [4]: await read_secret()
Out[4]: 42
👍 1
j

Jean-Michel Provencher

01/16/2023, 8:57 PM
yeah, but that makes your flow async right ? I haven't been able to find good example on how to manage async flows in the documentation.
n

Nate

01/16/2023, 9:35 PM
async flows work the same way as async python functions, you'll just have to await async function / task calls that you make in your async flow and then call your flow like an async function: • using
asyncio.run(my_async_flow())
if you don't already have an event loop • just
await my_async_flow()
if you're somewhere that already has an event loop (like ipython in example below)
In [19]: async def read_secret() -> int:
    ...:     return 42

In [20]: @flow(log_prints=True)
    ...: async def my_async_flow():
    ...:     print(await read_secret())

In [21]: await my_async_flow()
15:32:02.584 | INFO    | prefect.engine - Created flow run 'groovy-penguin' for flow 'my-async-flow'
15:32:03.889 | INFO    | Flow run 'groovy-penguin' - 42
15:32:04.113 | INFO    | Flow run 'groovy-penguin' - Finished in state Completed()