Ross Rochford11/14/2020, 5:37 AM
Zanie11/17/2020, 7:28 PM
Ross Rochford11/19/2020, 1:43 PM
Jim Crist-Harif11/19/2020, 3:27 PM
import time import asyncio from functools import wraps from prefect import task, Flow def asyncio_task(fn=None, **kwargs): """A version of `prefect.task` that works for `async def ` functions""" if fn is None: return lambda fn: asyncio_task(fn, **kwargs) @task(**kwargs) @wraps(fn) def inner(*args, **kwargs): return asyncio.run(fn(*args, **kwargs)) return inner @asyncio_task async def multi_sleep(n1, n2): """Run two concurrent sleeps, showing concurrency within a single task""" start = time.time() await asyncio.gather(asyncio.sleep(n1), asyncio.sleep(n2)) end = time.time() return end - start with Flow("asyncio example") as flow: res = multi_sleep(3, 4) out = flow.run() print("Slept for %.1f seconds" % out.result[res].result)
$ python test.py [2020-11-19 09:26:59-0600] INFO - prefect.FlowRunner | Beginning Flow run for 'asyncio example' [2020-11-19 09:26:59-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Starting task run... [2020-11-19 09:27:03-0600] INFO - prefect.TaskRunner | Task 'multi_sleep': Finished task run for task with final state: 'Success' [2020-11-19 09:27:03-0600] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded Slept for 4.0 seconds
decorator), but have been holding out for a bit while gathering use cases. TL;DR - prefect uses threads/processes for running tasks in parallel, but tasks are free to start an event loop inside them if they want to make use of some async library (like asyncio/trio).
Ross Rochford11/19/2020, 3:43 PM
Jim Crist-Harif11/19/2020, 3:46 PM
Ross Rochford11/19/2020, 3:50 PM
Chris White11/19/2020, 4:54 PM
Ernest01/13/2021, 1:31 PM
or a bit more documentation for asynchronous use cases. My use case. I'm using pyppeteer library for automating browser tasks. Pyppeteer is asynchronous. First task is to create a browser, then new page and then interact with that page. Every function/call is asynchronous and has to be awaited. Have tried to use method above and been able to create a browser instance but haven't figured out how to pass it to other tasks. p.s. I'm new to prefect and so far it has been a breeze compared to airflow or luigi.
Ross Rochford01/13/2021, 1:34 PM
Ernest01/13/2021, 1:39 PM