https://prefect.io logo
Title
r

Ross Rochford

11/14/2020, 5:37 AM
Hi everyone, I would like to use Prefect with python-trio and Im willing to develop the necessary integrations. I found some discussion on this topic and according to them, the best option is to add support for AnyIO. I think I can make progress on this with a little support. I guess a first step is to look at adapting the existing code for Twisted and Asyncio?
z

Zanie

11/17/2020, 7:28 PM
Hi @Ross Rochford. I’m curious why you want to use trio?
r

Ross Rochford

11/19/2020, 1:43 PM
I much prefer using trio to asyncio, it gives much nicer concurrency primitives
So for any I/O-heavy workloads, it would be nice to have an integrated compute environment that handles in-process, inter-process and multi-node concurrency
j

Jim Crist-Harif

11/19/2020, 3:27 PM
Hi Ross, Prefect doesn't support async tasks for any async backend (asyncio, trio, or otherwise). Prefect is written in a way that assumes tasks are synchronous functions - parallelism/concurrency is achieved through the use of threads or processes. This doesn't stop you from using trio or asyncio to write tasks, but the concurrency benefits of using such a library would have to come from within a single prefect task (perhaps you write some trio code to concurrently download several items, then run all of that within a single prefect task). For asyncio this would look something like:
import time
import asyncio
from functools import wraps
from prefect import task, Flow


def asyncio_task(fn=None, **kwargs):
    """A version of `prefect.task` that works for `async def ` functions"""

    if fn is None:
        return lambda fn: asyncio_task(fn, **kwargs)

    @task(**kwargs)
    @wraps(fn)
    def inner(*args, **kwargs):
        return asyncio.run(fn(*args, **kwargs))

    return inner


@asyncio_task
async def multi_sleep(n1, n2):
    """Run two concurrent sleeps, showing concurrency within a single task"""
    start = time.time()
    await asyncio.gather(asyncio.sleep(n1), asyncio.sleep(n2))
    end = time.time()
    return end - start


with Flow("asyncio example") as flow:
    res = multi_sleep(3, 4)


out = flow.run()

print("Slept for %.1f seconds" % out.result[res].result)
And the output:
$ python test.py
[2020-11-19 09:26:59-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'asyncio example'
[2020-11-19 09:26:59-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Starting task run...
[2020-11-19 09:27:03-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Finished task run for task with final state: 'Success'
[2020-11-19 09:27:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Slept for 4.0 seconds
We've been thinking about adding some tools for doing this pattern in prefect itself (something similar to the
asyncio_task
decorator), but have been holding out for a bit while gathering use cases. TL;DR - prefect uses threads/processes for running tasks in parallel, but tasks are free to start an event loop inside them if they want to make use of some async library (like asyncio/trio).
r

Ross Rochford

11/19/2020, 3:43 PM
Thank you for explaining that Jim. I think what I was imagining was some kind of dedicated asyncio workers, distinct from standard workers. Regular worker would each consume an entire process (or thread) but asyncio workers would consume a fraction of a process.
So I/O-heavy tasks you would be a "lighter" task, and tasks can yield when they are waiting.
j

Jim Crist-Harif

11/19/2020, 3:46 PM
I agree that this would be potentially useful, but isn't currently supported and isn't a small change. Much of the internals of prefect assumes synchronous execution of tasks.
I'd recommend using threads for now until you run into a measurable performance issue, threads should perform just fine for many use cases.
r

Ross Rochford

11/19/2020, 3:50 PM
Ok, good to know. Yeah, I just really like working with trio and I haven't yet figured out good patterns for using it in a distributed context. I think I'll use prefect as inspiration and then adopt those patterns into my own asyncio task scheduler.
c

Chris White

11/19/2020, 4:54 PM
@Marvin archive “How to use asynchronous framework with Prefect”
e

Ernest

01/13/2021, 1:31 PM
Hi! @Jim Crist-Harif Would be interested in
asyncio_task
or a bit more documentation for asynchronous use cases. My use case. I'm using pyppeteer library for automating browser tasks. Pyppeteer is asynchronous. First task is to create a browser, then new page and then interact with that page. Every function/call is asynchronous and has to be awaited. Have tried to use method above and been able to create a browser instance but haven't figured out how to pass it to other tasks. p.s. I'm new to prefect and so far it has been a breeze compared to airflow or luigi.
r

Ross Rochford

01/13/2021, 1:34 PM
Hi Ernst, if you're open to using selenium instead, I have an asyncio library and it has the ability to connect to existing remote sessions so it can be shared by different processes or nodes
(not suggesting it's impossible with pyppeteer, I've never used it)
e

Ernest

01/13/2021, 1:39 PM
Have used Selenium. Pyppeteer Api is more suitable for my task and by now I have bunch of things developed around it. Just need to automate it :)