Adisun Wheelock
04/22/2020, 4:33 PMasync
tasks.
from prefect import task, Flow
@task
async def extract():
return [1,2,3]
@task
async def transform(some_list):
transformed = [x + 1 for x in some_list]
return transformed
@task
async def load(transformed):
print('load somewhere')
with Flow('async testing') as flow:
extract_nums = extract()
transformed = transform(extract_nums)
load(transformed)
flow.run()
This creates a flow and all, but how do I actually run this flow asynchronously?
Slack Conversationnicholas
DaskExecutor
. You can read a bit more about that here:
https://docs.prefect.io/core/concepts/engine.html#executorsAdisun Wheelock
04/22/2020, 5:01 PMfrom prefect.engine.executors import LocalDaskExecutor
from prefect import task, Flow
@task
async def extract():
return [1,2,3]
@task
async def transform(some_list):
transformed = [x + 1 for x in some_list]
return transformed
@task
async def load(transformed):
print('load somewhere')
with Flow('async testing') as flow:
extract_nums = extract()
transformed = transform(extract_nums)
load(transformed)
executor = LocalDaskExecutor(address="<tcp://10.0.0.166:8786>")
flow.run(executor=executor)
Is there a more in depth tutorial using the LocalDaskExecutor
with python coroutines?nicholas
Adisun Wheelock
04/22/2020, 5:22 PMLocalDaskExecutor
😕Adisun Wheelock
04/22/2020, 5:23 PMTask
to make my own AsyncTask
that just runs the wrapped coroutine in an event loop.nicholas
Adisun Wheelock
04/22/2020, 5:32 PMasync
syntax.Adisun Wheelock
04/22/2020, 5:33 PMasync
syntax support in prefect.nicholas
async
functions (and neither does Dask). We could potentially create a new Executor to run things in a single event loop but that doesn't exist atm.Adisun Wheelock
04/22/2020, 6:31 PMAsyncTask
wrapper will suffice for the time being.nicholas
rmax
04/23/2020, 10:54 AMAlexander Hirner
04/27/2020, 1:53 PMAsyncTask
? Even if it might be as simple as get_event_loop(); ...run_until_complete()
. There are a few ways one could cut that problem and every variant helps.Adisun Wheelock
04/27/2020, 1:56 PMimport asyncio
from prefect import Task
class AsyncTask(Task):
def __init__(self, fn, name: str = None, **kwargs):
if not callable(fn):
raise TypeError("fn must be callable.")
# set the name from the fn
if name is None:
name = getattr(fn, "__name__", type(self).__name__)
prefect.core.task._validate_run_signature(fn) # type: ignore
self.fn = fn
self.result = None
super().__init__(name=name, **kwargs)
def run(self, **kwargs):
task_event_loop = asyncio.get_event_loop()
self.result = task_event_loop.run_until_complete(self.fn(**kwargs))
return self.result
Alexander Hirner
04/27/2020, 2:01 PMAlexander Hirner
04/27/2020, 2:03 PMResult
Adisun Wheelock
04/27/2020, 2:31 PMself.results
entirely and just return task_event_loop.run_until_complete(self.fn(**kwargs))