https://prefect.io logo
Title
a

Adisun Wheelock

04/22/2020, 4:33 PM
Hi everyone, I have a question about running
async
tasks.
from prefect import task, Flow

@task
async def extract():
    return [1,2,3]

@task
async def transform(some_list):
    transformed = [x + 1 for x in some_list]
    return transformed

@task
async def load(transformed):
    print('load somewhere')
    
with Flow('async testing') as flow:
    extract_nums = extract()
    transformed = transform(extract_nums)
    load(transformed)

flow.run()
This creates a flow and all, but how do I actually run this flow asynchronously? Posted in #random
n

nicholas

04/22/2020, 4:41 PM
Hi @Adisun Wheelock! Async tasks can be run using the
DaskExecutor
. You can read a bit more about that here: https://docs.prefect.io/core/concepts/engine.html#executors
a

Adisun Wheelock

04/22/2020, 5:01 PM
Thanks for the quick response! I just followed that tutorial, and it modifies my code slightly:
from prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow

@task
async def extract():
    return [1,2,3]

@task
async def transform(some_list):
    transformed = [x + 1 for x in some_list]
    return transformed

@task
async def load(transformed):
    print('load somewhere')
    
with Flow('async testing') as flow:
    extract_nums = extract()
    transformed = transform(extract_nums)
    load(transformed)

executor = LocalDaskExecutor(address="<tcp://10.0.0.166:8786>")
flow.run(executor=executor)
Is there a more in depth tutorial using the
LocalDaskExecutor
with python coroutines?
n

nicholas

04/22/2020, 5:21 PM
Great! We have an advanced tutorial on Dask deployment here: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#the-dask-executor - it doesn't mention any coroutines in particular as far as I remember but it should get you started with Local Dask
a

Adisun Wheelock

04/22/2020, 5:22 PM
Yeah I saw that one, was hoping for an example of building a DAG with async tasks using
LocalDaskExecutor
😕
I ended up subclassing
Task
to make my own
AsyncTask
that just runs the wrapped coroutine in an event loop.
n

nicholas

04/22/2020, 5:28 PM
Ah sorry, I misunderstood what you were looking for there. Can you help me understand your use case? In a linear DAG like your earlier example, the tasks need to be run sequentially whether they're asynchronous or not.
a

Adisun Wheelock

04/22/2020, 5:32 PM
Linear is fine, i'm mostly just looking for if prefect supports pythons
async
syntax.
I know you can run tasks asynchronously with the dask executor, but I am looking for python
async
syntax support in prefect.
n

nicholas

04/22/2020, 6:00 PM
Thanks @Adisun Wheelock - Prefect doesn't directly support
async
functions (and neither does Dask). We could potentially create a new Executor to run things in a single event loop but that doesn't exist atm.
a

Adisun Wheelock

04/22/2020, 6:31 PM
okay thanks! I think my
AsyncTask
wrapper will suffice for the time being.
n

nicholas

04/22/2020, 6:32 PM
Great! If you're feeling like it's not quite doing what you'd hoped, let us know!
r

rmax

04/23/2020, 10:54 AM
Interesting problem. Is there an overhead of creating one event loop per task vs one event loop per executor? I see the use cases for async flows when the tasks perform exclusively IO operations (i.e.: downloading files, calling external services). But it can also be common to need to mix sync and async tasks (i.e.: doing some processing in one task and IO operations in a parallel one).
a

Alexander Hirner

04/27/2020, 1:53 PM
@Adisun Wheelock any chance you could share your
AsyncTask
? Even if it might be as simple as
get_event_loop(); ...run_until_complete()
. There are a few ways one could cut that problem and every variant helps.
a

Adisun Wheelock

04/27/2020, 1:56 PM
yeah it's basically just that. Here it is:
import asyncio
from prefect import Task
class AsyncTask(Task):
    def __init__(self, fn, name: str = None, **kwargs):
        if not callable(fn):
            raise TypeError("fn must be callable.")

        # set the name from the fn
        if name is None:
            name = getattr(fn, "__name__", type(self).__name__)

        prefect.core.task._validate_run_signature(fn)  # type: ignore
        self.fn = fn
        self.result = None
        super().__init__(name=name, **kwargs)

    def run(self, **kwargs):
        task_event_loop = asyncio.get_event_loop()
        self.result = task_event_loop.run_until_complete(self.fn(**kwargs))
        return self.result
a

Alexander Hirner

04/27/2020, 2:01 PM
got ya, thx!
Did you run into problems of overwriting self.result on later branches? I think it's used to store values wrapped in prefect
Result
a

Adisun Wheelock

04/27/2020, 2:31 PM
I didn't in the tests I ran. But they were very minimal. A better solution would be to omit
self.results
entirely and just
return task_event_loop.run_until_complete(self.fn(**kwargs))
🔝 1