Ross Rochford
11/14/2020, 5:37 AMZanie
11/17/2020, 7:28 PMRoss Rochford
11/19/2020, 1:43 PMJim Crist-Harif
11/19/2020, 3:27 PMimport time
import asyncio
from functools import wraps
from prefect import task, Flow
def asyncio_task(fn=None, **kwargs):
"""A version of `prefect.task` that works for `async def ` functions"""
if fn is None:
return lambda fn: asyncio_task(fn, **kwargs)
@task(**kwargs)
@wraps(fn)
def inner(*args, **kwargs):
return asyncio.run(fn(*args, **kwargs))
return inner
@asyncio_task
async def multi_sleep(n1, n2):
"""Run two concurrent sleeps, showing concurrency within a single task"""
start = time.time()
await asyncio.gather(asyncio.sleep(n1), asyncio.sleep(n2))
end = time.time()
return end - start
with Flow("asyncio example") as flow:
res = multi_sleep(3, 4)
out = flow.run()
print("Slept for %.1f seconds" % out.result[res].result)
$ python test.py
[2020-11-19 09:26:59-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'asyncio example'
[2020-11-19 09:26:59-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Starting task run...
[2020-11-19 09:27:03-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Finished task run for task with final state: 'Success'
[2020-11-19 09:27:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Slept for 4.0 seconds
asyncio_task
decorator), but have been holding out for a bit while gathering use cases.
TL;DR - prefect uses threads/processes for running tasks in parallel, but tasks are free to start an event loop inside them if they want to make use of some async library (like asyncio/trio).Ross Rochford
11/19/2020, 3:43 PMJim Crist-Harif
11/19/2020, 3:46 PMRoss Rochford
11/19/2020, 3:50 PMChris White
11/19/2020, 4:54 PMMarvin
11/19/2020, 4:54 PMErnest
01/13/2021, 1:31 PMasyncio_task
or a bit more documentation for asynchronous use cases.
My use case. I'm using pyppeteer library for automating browser tasks. Pyppeteer is asynchronous. First task is to create a browser, then new page and then interact with that page. Every function/call is asynchronous and has to be awaited. Have tried to use method above and been able to create a browser instance but haven't figured out how to pass it to other tasks.
p.s. I'm new to prefect and so far it has been a breeze compared to airflow or luigi.Ross Rochford
01/13/2021, 1:34 PMErnest
01/13/2021, 1:39 PM