Ross Rochford
11/14/2020, 5:37 AMZanie
Ross Rochford
11/19/2020, 1:43 PMRoss Rochford
11/19/2020, 1:44 PMJim Crist-Harif
11/19/2020, 3:27 PMimport time
import asyncio
from functools import wraps
from prefect import task, Flow
def asyncio_task(fn=None, **kwargs):
"""A version of `prefect.task` that works for `async def ` functions"""
if fn is None:
return lambda fn: asyncio_task(fn, **kwargs)
@task(**kwargs)
@wraps(fn)
def inner(*args, **kwargs):
return asyncio.run(fn(*args, **kwargs))
return inner
@asyncio_task
async def multi_sleep(n1, n2):
"""Run two concurrent sleeps, showing concurrency within a single task"""
start = time.time()
await asyncio.gather(asyncio.sleep(n1), asyncio.sleep(n2))
end = time.time()
return end - start
with Flow("asyncio example") as flow:
res = multi_sleep(3, 4)
out = flow.run()
print("Slept for %.1f seconds" % out.result[res].result)
Jim Crist-Harif
11/19/2020, 3:27 PM$ python test.py
[2020-11-19 09:26:59-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'asyncio example'
[2020-11-19 09:26:59-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Starting task run...
[2020-11-19 09:27:03-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Finished task run for task with final state: 'Success'
[2020-11-19 09:27:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Slept for 4.0 seconds
Jim Crist-Harif
11/19/2020, 3:29 PMasyncio_task
decorator), but have been holding out for a bit while gathering use cases.
TL;DR - prefect uses threads/processes for running tasks in parallel, but tasks are free to start an event loop inside them if they want to make use of some async library (like asyncio/trio).Ross Rochford
11/19/2020, 3:43 PMRoss Rochford
11/19/2020, 3:45 PMJim Crist-Harif
11/19/2020, 3:46 PMJim Crist-Harif
11/19/2020, 3:47 PMRoss Rochford
11/19/2020, 3:50 PMChris White
Marvin
11/19/2020, 4:54 PMErnest
01/13/2021, 1:31 PMasyncio_task
or a bit more documentation for asynchronous use cases.
My use case. I'm using pyppeteer library for automating browser tasks. Pyppeteer is asynchronous. First task is to create a browser, then new page and then interact with that page. Every function/call is asynchronous and has to be awaited. Have tried to use method above and been able to create a browser instance but haven't figured out how to pass it to other tasks.
p.s. I'm new to prefect and so far it has been a breeze compared to airflow or luigi.Ross Rochford
01/13/2021, 1:34 PMRoss Rochford
01/13/2021, 1:35 PMErnest
01/13/2021, 1:39 PM