https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Tom Matthews

07/04/2022, 9:16 AM
Context: I have a number of tasks that are essentially API calls which can be run in parallel and take ~ 1 second each but every second of latency saved is very important. I’d like to use Prefect in production and take advantage of the native async support, but async support is only available in prefect 2.0 and that is not recommended in production. Is there any recommendations here, would it be safe to use prefect 2 in Production, or is there anyway to use asyncio in prefect 1.0 (I couldn’t find any documentation for this), or would it be a good idea to just use the DaskExecutor in prefect 1.0 with the
threads
scheduler? 🙏
1
r

Ron Levi

07/04/2022, 9:25 AM
For Prefect 1.0: what I suggest and we use is to wrap your tasks with a custom decorator based on prefect's task (you can call it async_task). the decorating function will run the given function using asyncio. Here's a basic outline snippet:
Copy code
def async_task(fn=None, **kwargs):
	if fn is None:
        return lambda fn: async_task(fn, **kwargs)
    @task(**kwargs)
    @wraps(fn)
    def inner(*args, **kwargs):
        asyncio.run(fn(*args, **kwargs)) 
	return inner
t

Tom Matthews

07/04/2022, 9:35 AM
Ron you legend! Thanks so much, I’ll have a play around with getting that working now 👌
🙌 2
Hey Ron, I’m trying to get an example up and running with your suggested decorator, but still hitting the same error associated with using async with the prefect Flow context manager:
Copy code
import asyncio
from functools import wraps

from prefect import Flow, task


def async_task(fn=None, **kwargs):
    @task(**kwargs)
    @wraps(fn)
    def inner(*args, **kwargs):
        asyncio.run(fn(*args, **kwargs))

    return inner


@async_task
async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")


@async_task
async def main():
    return await asyncio.gather(count(), count(), count())


async def run_flow():

    async with Flow("Test") as flow:
        result = await main()

    flow.run()


if __name__ == "__main__":
    asyncio.run(run_flow())
Can you see what i’m doing wrong here? Also, im pretty new to asyncio 😅
r

Ron Levi

07/04/2022, 10:00 AM
don't call asyncio run/gather outside of the decorator definition + wraped tasks shouldn't be with async signature (handled on the decorator level)
👍 1
t

Tom Matthews

07/04/2022, 11:07 AM
Sorry Ron, i’m struggling to get something working here, do you by any chance have a more concrete example you could share? No problem if not, appreciate your time!
a

Anna Geller

07/04/2022, 11:58 AM
tasks that are essentially API calls which can be run in parallel and take ~ 1 second each but every second of latency saved is very important
seems like a great use case for ConcurrentTaskRunner
prefect 2.0 and that is not recommended in production
it's not as clear-cut - it depends on your use case and what is meant by production really. Check this page for more details - and GA of Prefect 2.0 is just around the corner so I highly encourage you to use Prefect 2.0 for that use case rather than building workarounds with Prefect 1.0 now
r

Ron Levi

07/04/2022, 12:02 PM
@Tom Matthews** Some additional pointers from looking at your code: don't define Flow context manager as async (so you can remove 'await' in it) , no need to asyncio.run from main as well.
a

Anna Geller

07/04/2022, 12:03 PM
would it be safe to use prefect 2 in Production
Prefect 2.0 is an open-source product and with that, you have the same guarantees as with any OSS product - you are in charge of making that production-ready yourself. However, for Cloud 2.0 we do our best effort to avoid any data loss and when we introduce any breaking changes to the API, we always provide instructions on how you can adjust your flows or execution layer setup to match with the latest release, so it's not as scary as it seems to be. But for sure, you need to judge yourself based on your appetite for software and your use case/needs in your team
@Ron Levi thanks so much for sharing your examples 🙌
t

Tom Matthews

07/04/2022, 2:33 PM
Thank you Ron and Anna! That’s great to know about prefect 2 coming out of Beta imminently, i would much rather use the native support in prefect 2 if possible. I’m afraid my case would have to be implementing on our own AWS servers rather than using Prefect cloud due to privacy concerns (policy at Apple). @Anna Geller Could i ask, what is the difference between using the default
ConcurrentTaskRunner
and leveraging the native support for async concurrency? Would you make use of
asyncio
if you had concerns around thread safety?
a

Anna Geller

07/04/2022, 2:47 PM
Regarding your privacy concerns, please talk to sales@prefect.io before making any long term infrastructure decisions. We are open to providing dedicated infrastructure in Cloud 2.0 Concurrent task runner makes async easy - this task runner is essentially magic built by Michael, our async expert, which runs your tasks asynchronously even without you having to explicitly write async code
🔥 2