Gabriel Lespérance
07/29/2024, 7:50 PMMarvin
07/29/2024, 7:50 PMpython
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
return x + y
@flow
def my_flow():
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
my_flow()
assert add.fn(1, 2) == 3
Sources:
- Concept of Flows: https://docs.prefect.io/latest/concepts/flows/
- Tutorial on Tasks: https://docs.prefect.io/latest/tutorial/tasks/
The example demonstrates a flow calling tasks to perform computations. Tasks must be called within a flow to be executed.
***Would you like to know anything else?*Gabriel Lespérance
07/29/2024, 7:57 PMMarvin
07/29/2024, 7:57 PMpython
from prefect import task, flow
from prefect.utilities.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=10)
def failing_task():
raise ValueError("Something went wrong!")
@flow
def retry_flow():
failing_task()
if __name__ == "__main__":
retry_flow()
Flows:
- Retries: Flows can also be configured to retry.
- Retries Example:
python
from prefect import flow, task
@task
def always_fails():
raise ValueError("Oh no!")
@flow(retries=2, retry_delay_seconds=5)
def retry_flow():
always_fails()
if __name__ == "__main__":
retry_flow()
Would you need any further details or examples on configuring retries for tasks and flows?Gabriel Lespérance
07/29/2024, 7:58 PMNate
07/29/2024, 8:41 PMGabriel Lespérance
07/29/2024, 8:47 PMfrom functools import wraps
from prefect import flow, task
def proxyflow(func):
@task
@wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
def flowtask(func=None, **flow_kwargs):
def decorator(func):
@proxyflow
@flow(**flow_kwargs)
@wraps(func)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
if func is None:
return decorator
else:
return decorator(func)
and just using flowtask for everything but the starting flow and end tasks — it saves a lot of headachesNate
07/29/2024, 8:49 PMI’ve found moments where chaining flows would run everything sequentially, then in parallel if I wrapped the flows in tasks…hm id be curious to see an example of this. both should behave like python functions do in general, with the exception that task has the
submit
/ map
interface as syntactic sugar for concurrencyNate
07/29/2024, 8:50 PMGabriel Lespérance
07/29/2024, 8:53 PMcount_to_proxy
sequentially
import asyncio
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray
rtr = RayTaskRunner()
@task
async def shout(number):
time.sleep(5)
print(f"#{number}")
@flow(task_runner=rtr, log_prints=True)
async def count_to(highest_number):
results = [shout.submit(number) for number in range(highest_number)]
return results
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to_proxy(highest_number) for _ in range(times)]
return await asyncio.gather(*results)
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
Gabriel Lespérance
07/29/2024, 8:54 PMimport asyncio
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
async def shout(number):
asyncio.sleep(5)
print(f"#{number}")
@task
async def count_to(highest_number):
# This doesn't work
results = [shout.submit(number) for number in range(highest_number)]
results[-1].result()
# This works
# results = await asyncio.gather(*[shout(number) for number in range(highest_number)])
return results
@flow(task_runner=RayTaskRunner, log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to.submit(highest_number) for _ in range(times)]
return results
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
error:
(count_to pid=30383) 16:55:32.565 | INFO | Task run 'count_to-c85a8294-33ca-4745-9d17-244a8b5e8b2f' - Created task run 'count_to-c85a8294-33ca-4745-9d17-244a8b5e8b2f' for task 'count_to'
(count_to pid=30387) 16:55:32.836 | ERROR | Task run 'count_to-0fc46acd-5df6-4520-915b-1f20ffd47f75' - Task run failed with exception: RuntimeError('The task runner must be started before submitting work.') - Retries are exhausted
Nate
07/29/2024, 8:56 PMGabriel Lespérance
07/29/2024, 8:56 PMGabriel Lespérance
07/29/2024, 8:56 PMGabriel Lespérance
07/29/2024, 8:57 PMimport asyncio
import time
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
async def shout(number):
asyncio.sleep(5)
print(f"#{number}")
@flowtask
async def count_to(highest_number):
# This doesn't work
results = [shout.submit(number) for number in range(highest_number)]
results[-1].result()
# This works
# results = await asyncio.gather(*[shout(number) for number in range(highest_number)])
return results
@flow(task_runner=RayTaskRunner, log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to.submit(highest_number) for _ in range(times)]
return results
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
where flowtask is defined as in my prev messageGabriel Lespérance
07/29/2024, 8:59 PMNate
07/29/2024, 9:09 PMtime.sleep(5)
worth noting this will always block the event loop
but sorry if im missing some nuance but why all the misdirection / nesting?
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@task
def count_to(highest_number):
futures = shout.map([i for i in range(highest_number)])
return [f.result() for f in futures]
@flow(log_prints=True)
def many_flows(highest_number: int, times: int):
futures = count_to.map([highest_number] * times)
return [f.result() for f in futures]
if __name__ == "__main__":
many_flows(10, 3)
Gabriel Lespérance
07/29/2024, 9:13 PMimport asyncio
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@task
def count_to(highest_number):
futures = shout.map([i for i in range(highest_number)])
return [f.result() for f in futures]
@flow(log_prints=True, task_runner=RayTaskRunner())
async def many_flows(highest_number: int, times: int):
futures = count_to.map([highest_number] * times)
return [f.result() for f in futures]
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
Nate
07/29/2024, 9:17 PMNate
07/29/2024, 9:17 PMGabriel Lespérance
07/29/2024, 9:18 PMimport asyncio
from prefect import flow, task
from prefect_ray import RayTaskRunner
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@task
def count_to(highest_number):
futures = [shout.submit(i) for i in range(highest_number)]
return futures
@flow(
log_prints=True,
)
async def many_flows(highest_number: int, times: int):
futures = [count_to.submit(highest_number) for _ in range(times)]
return futures
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
but it breaksGabriel Lespérance
07/29/2024, 9:19 PMwould you mind opening an issue?sure
Nate
07/29/2024, 9:19 PMasync def many_flows(highest_number: int, times: int):
futures = [count_to.submit(highest_number) for _ in range(times)]
return futures
> but it breaks
i would imagine its because the futures are not being resolved, i.e. .result()
being called on themGabriel Lespérance
07/29/2024, 9:20 PMGabriel Lespérance
07/29/2024, 9:20 PMGabriel Lespérance
07/29/2024, 9:20 PMNate
07/29/2024, 9:22 PMIn [1]: from prefect import flow, task
In [2]: @task
...: def foo(_):
...: return 42
...:
In [3]: @flow
...: def bar():
...: assert foo.map(range(10)).result() == [42] * 10
Nate
07/29/2024, 9:24 PMbut its pretty unclear when I need to use what — ie when I should be using .result, or when just returning the future from submit is okayfeel free to let us know if anything is glaringly missing
Gabriel Lespérance
07/29/2024, 9:24 PMNate
07/29/2024, 9:24 PMcold you tell me whats the deal with just calling the async methods too?sorry, im not sure I understand the question
Gabriel Lespérance
07/29/2024, 9:27 PMimport asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@flow(log_prints=True)
async def count_to(highest_number):
results = [shout.submit(number) for number in range(highest_number)]
return results
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to_proxy(highest_number) for _ in range(times)]
return await asyncio.gather(*results)
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
this also runs sequentiallyNate
07/29/2024, 9:43 PMcount_to
doesn't resolve its futures, and neither do either of the parents, so we will throw to warn about that.
• if count_to
did resolve its futures with list_of_futures.result()
then it would be necessarily blocking, and therefore superficially async
the below is concurrent in the way you seem to want, although it seems we have some issue to root out with ray / dask
import asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@task
def count_to(highest_number):
return (shout.map(range(highest_number))).result()
@flow(log_prints=True)
def many_flows(highest_number: int, times: int):
return (count_to.map([highest_number] * times)).result()
if __name__ == "__main__":
many_flows(10, 3)
Gabriel Lespérance
07/29/2024, 9:46 PMNate
07/29/2024, 9:46 PMGabriel Lespérance
07/29/2024, 9:47 PMGabriel Lespérance
07/29/2024, 9:47 PMNate
07/29/2024, 9:52 PMimport asyncio
import aiohttp
from prefect import flow, task
@task
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
@task
async def transform_data(data):
for item in data:
yield item["id"], item["title"].upper()
@task
async def load_data(transformed_data):
async for id, title in transformed_data:
print(f"Loaded: {id} - {title}")
@flow
async def etl_pipeline(urls):
for url in urls:
data = await fetch_data(url)
transformed = transform_data(data)
await load_data(transformed)
if __name__ == "__main__":
urls = [
"<https://jsonplaceholder.typicode.com/posts>",
"<https://jsonplaceholder.typicode.com/todos>",
]
asyncio.run(etl_pipeline(urls))
Gabriel Lespérance
07/29/2024, 9:54 PMHowever, asynchronous nested flows can run concurrently with AnyIO task groups or asyncio.gather.ref: https://docs-3.prefect.io/3.0rc/develop/write-flows#asynchronous-functions
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1)
print(value, end=" ")
@flow
async def async_flow():
print("Hello, I'm an async flow")
# runs immediately
await print_values([1, 2])
# runs concurrently
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(async_flow())
so why is my async code example not running async?Gabriel Lespérance
07/29/2024, 9:55 PM@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to_proxy(highest_number) for _ in range(times)]
return await asyncio.gather(*results)
^ this is clearly not blockingGabriel Lespérance
07/29/2024, 9:56 PMimport asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@flow(log_prints=True)
async def count_to(highest_number):
results = [shout.submit(number) for number in range(highest_number)]
return results
and neither is thisNate
07/29/2024, 9:57 PM•doesn't resolve its futures, and neither do either of the parents, so we will throw to warn about that (ie it breaks)count_to
• ifI think I addressed this here, let me know if you think that's not the casedid resolve its futures withcount_to
then it would be necessarily blocking, and therefore superficially asynclist_of_futures.result()
Gabriel Lespérance
07/29/2024, 10:01 PMNate
07/29/2024, 10:13 PMsubmit
and map
is sort of opting out of traditional async / await (don't hold me to this exact language, but we could make something like this more clear in the docs), since under the hood its basically a ThreadPoolExecutor
(or dask / ray) that "does the concurrency" for you, leaving you as the user with an all sync interface to submit
wait
or get `result`s from your work
so if you wanna go full asyncio you should be able to do this
import asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
@flow
async def count_to(highest_number):
return await asyncio.gather(*[shout(i) for i in range(highest_number)])
@task
async def count_to_proxy(highest_number):
return await count_to(highest_number)
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
results = [count_to_proxy(highest_number) for _ in range(times)]
return await asyncio.gather(*results)
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
Nate
07/29/2024, 10:14 PMcount_to
for traditional asyncio
Gabriel Lespérance
07/29/2024, 10:16 PMGabriel Lespérance
07/29/2024, 10:16 PMGabriel Lespérance
07/29/2024, 10:16 PMimport asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
return number
@flow(log_prints=True)
async def count_to(highest_number):
futures = [shout.submit(number) for number in range(highest_number)]
results = [future.result() for future in futures]
return results
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
futures = [count_to(highest_number) for _ in range(times)]
results = await asyncio.gather(*futures)
print(f"ALL DONE: {results}")
return results
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
Nate
07/29/2024, 10:17 PMresults = [future.result() for future in futures]
since, this line is blockingGabriel Lespérance
07/29/2024, 10:20 PMimport asyncio
from prefect import flow, task
@task
async def shout(number):
await asyncio.sleep(5)
print(f"#{number}")
return number
@flow(log_prints=True)
async def count_to(highest_number):
futures = [shout.submit(number) for number in range(highest_number)]
return futures
@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
futures = [count_to(highest_number) for _ in range(times)]
results = await asyncio.gather(*futures)
print(f"ALL DONE: {results}")
return results
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))
Gabriel Lespérance
07/29/2024, 10:20 PMGabriel Lespérance
07/29/2024, 10:21 PMGabriel Lespérance
07/29/2024, 10:21 PMNate
07/29/2024, 10:24 PMGabriel Lespérance
07/29/2024, 10:25 PMGabriel Lespérance
07/29/2024, 10:28 PMGabriel Lespérance
07/29/2024, 10:29 PMcount_to
doesn’t resolve its futures, and neither do either of the parents, so we will throw to warn about that (ie it breaks)
I never saw anything like that so it might be failling silentlyNate
07/29/2024, 10:37 PMsubmit
/ map
is easy mode for our many users who have no interest in learning ins and outs of asyncio
/ anyio
/ thread pools
3. yes! we will look into the ray problem. you may also be interested in the less easy mode client option (bottom)
4. yeah, unfortunately this is how it worked in prefect 2 so to avoid breaking tons of user code we decided to stick there for now, which I think i forgot we reintroduced! my bad. so this should say
> • count_to
doesn’t resolve its futures, and neither do either of the parents, so when we return from teh flow we will implicitly wait for them
ill also point out that this exists, a sort of intermediate option if you already know daskNate
07/29/2024, 10:37 PMGabriel Lespérance
07/29/2024, 10:52 PMGabriel Lespérance
07/29/2024, 10:59 PMNate
07/29/2024, 11:09 PMas_completed
utility
In [1]: from prefect import task, flow
In [2]: import asyncio
In [3]: from prefect.futures import as_completed
In [4]: @task
...: async def f(t):
...: await asyncio.sleep(t)
...: return t
...:
In [5]: @flow
...: def g():
...: for fut in as_completed(f.map(range(10))):
...: print(fut.result())
Nate
07/29/2024, 11:11 PMresult()
as well, but i dont believe we do nowGabriel Lespérance
07/30/2024, 1:29 PMimport asyncio
import time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect_flowtask import flowtask
@task(log_prints=True)
async def shout(number, _):
time.sleep(5)
print(f"#{number}")
return number
@task()
async def count_to(highest_number):
futures = []
for number in range(highest_number):
futures.append(shout.submit(number, futures[-1] if futures else None))
return futures
@flow(task_runner=ConcurrentTaskRunner())
async def many_flows(highest_number: int, times: int):
futures = await asyncio.gather(*[count_to(highest_number) for _ in range(times)])
print(f"ALL DONE: {[f.result() for l in futures for f in l]}")
if __name__ == "__main__":
asyncio.run(many_flows(10, 3))