Hi everyone, I have a question about running `asyn...
# prefect-community
a
Hi everyone, I have a question about running
async
tasks.
Copy code
from prefect import task, Flow

@task
async def extract():
    return [1,2,3]

@task
async def transform(some_list):
    transformed = [x + 1 for x in some_list]
    return transformed

@task
async def load(transformed):
    print('load somewhere')
    
with Flow('async testing') as flow:
    extract_nums = extract()
    transformed = transform(extract_nums)
    load(transformed)

flow.run()
This creates a flow and all, but how do I actually run this flow asynchronously? Posted in #random
n
Hi @Adisun Wheelock! Async tasks can be run using the
DaskExecutor
. You can read a bit more about that here: https://docs.prefect.io/core/concepts/engine.html#executors
a
Thanks for the quick response! I just followed that tutorial, and it modifies my code slightly:
Copy code
from prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow

@task
async def extract():
    return [1,2,3]

@task
async def transform(some_list):
    transformed = [x + 1 for x in some_list]
    return transformed

@task
async def load(transformed):
    print('load somewhere')
    
with Flow('async testing') as flow:
    extract_nums = extract()
    transformed = transform(extract_nums)
    load(transformed)

executor = LocalDaskExecutor(address="<tcp://10.0.0.166:8786>")
flow.run(executor=executor)
Is there a more in depth tutorial using the
LocalDaskExecutor
with python coroutines?
n
Great! We have an advanced tutorial on Dask deployment here: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#the-dask-executor - it doesn't mention any coroutines in particular as far as I remember but it should get you started with Local Dask
a
Yeah I saw that one, was hoping for an example of building a DAG with async tasks using
LocalDaskExecutor
😕
I ended up subclassing
Task
to make my own
AsyncTask
that just runs the wrapped coroutine in an event loop.
n
Ah sorry, I misunderstood what you were looking for there. Can you help me understand your use case? In a linear DAG like your earlier example, the tasks need to be run sequentially whether they're asynchronous or not.
a
Linear is fine, i'm mostly just looking for if prefect supports pythons
async
syntax.
I know you can run tasks asynchronously with the dask executor, but I am looking for python
async
syntax support in prefect.
n
Thanks @Adisun Wheelock - Prefect doesn't directly support
async
functions (and neither does Dask). We could potentially create a new Executor to run things in a single event loop but that doesn't exist atm.
a
okay thanks! I think my
AsyncTask
wrapper will suffice for the time being.
n
Great! If you're feeling like it's not quite doing what you'd hoped, let us know!
r
Interesting problem. Is there an overhead of creating one event loop per task vs one event loop per executor? I see the use cases for async flows when the tasks perform exclusively IO operations (i.e.: downloading files, calling external services). But it can also be common to need to mix sync and async tasks (i.e.: doing some processing in one task and IO operations in a parallel one).
a
@Adisun Wheelock any chance you could share your
AsyncTask
? Even if it might be as simple as
get_event_loop(); ...run_until_complete()
. There are a few ways one could cut that problem and every variant helps.
a
yeah it's basically just that. Here it is:
Copy code
import asyncio
from prefect import Task
class AsyncTask(Task):
    def __init__(self, fn, name: str = None, **kwargs):
        if not callable(fn):
            raise TypeError("fn must be callable.")

        # set the name from the fn
        if name is None:
            name = getattr(fn, "__name__", type(self).__name__)

        prefect.core.task._validate_run_signature(fn)  # type: ignore
        self.fn = fn
        self.result = None
        super().__init__(name=name, **kwargs)

    def run(self, **kwargs):
        task_event_loop = asyncio.get_event_loop()
        self.result = task_event_loop.run_until_complete(self.fn(**kwargs))
        return self.result
a
got ya, thx!
Did you run into problems of overwriting self.result on later branches? I think it's used to store values wrapped in prefect
Result
a
I didn't in the tests I ran. But they were very minimal. A better solution would be to omit
self.results
entirely and just
return task_event_loop.run_until_complete(self.fn(**kwargs))
🔝 1