<@ULVA73B9P> whats the difference between a task a...
# marvin-in-the-wild
g
@Marvin whats the difference between a task and a flow — can I just use flows?
m
Here's a breakdown of the difference between tasks and flows in Prefect: Tasks: - Granular Units of Work: Tasks perform discrete, specific pieces of work (e.g., calling an API, performing a database operation). - Parallel/Distributed Execution: Well-suited for parallel or distributed execution using frameworks like Dask or Ray. - Debugging: Granular tasks make it easier to troubleshoot and fix issues. Flows: - Container for Workflow Logic: Flows are containers that define the overall logic of the workflow. They can include tasks and other flows (subflows). - Track Execution: Each invocation of a flow is tracked, and all state transitions are reported to the API. - Enhanced Functionality: Flows support retries, timeouts, automatic logging, and type checking. - Subflows: Flows can call other flows, enabling the grouping of related tasks. In summary, while you can technically put everything into a single flow, breaking logic into smaller, reusable tasks has significant benefits, especially for debugging and scalability. Example:
Copy code
python
from prefect import flow, task

@task(task_run_name="Adding {x} and {y}")
def add(x: int, y: int) -> int:
    return x + y

@flow
def my_flow():
    first_result = add(1, 2)
    second_result = add(first_result, first_result)
    return add(first_result, second_result)

if __name__ == "__main__":
    my_flow()
    assert add.fn(1, 2) == 3
Sources: - Concept of Flows: https://docs.prefect.io/latest/concepts/flows/ - Tutorial on Tasks: https://docs.prefect.io/latest/tutorial/tasks/ The example demonstrates a flow calling tasks to perform computations. Tasks must be called within a flow to be executed. ***Would you like to know anything else?*
g
@Marvin but tasks also support retries
m
You're correct; both tasks and flows in Prefect support retries, along with other features such as timeouts and automatic logging. Here's a brief clarification: Tasks: - Retries: Tasks can be configured to retry upon failure. - Retries Example:
Copy code
python
from prefect import task, flow
from prefect.utilities.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=10)
def failing_task():
    raise ValueError("Something went wrong!")

@flow
def retry_flow():
    failing_task()

if __name__ == "__main__":
    retry_flow()
Flows: - Retries: Flows can also be configured to retry. - Retries Example:
Copy code
python
from prefect import flow, task

@task
def always_fails():
    raise ValueError("Oh no!")

@flow(retries=2, retry_delay_seconds=5)
def retry_flow():
    always_fails()

if __name__ == "__main__":
    retry_flow()
Would you need any further details or examples on configuring retries for tasks and flows?
g
@Marvin so they look and sound the same a lot
n
yeah its a common question, they're not so different but they have some clear-cut functionality differences • more task orchestration happens client-side for efficiency, which is why you'll see stuff like caching and retry_condition_fn available on tasks, but not flows philosophically, there are a couple common ways we think about them differently (somewhat related to the functionality disparity) • tasks are discrete pieces of work • flows compose units of work is that a satisfying enough answer?
g
tbh it’s both too close to each other and too different — I’ve found moments where chaining flows would run everything sequentially, then in parallel if I wrapped the flows in tasks… other inconsistencies too when we get a task_runner involved. I feel that from a DX pov I dont care as much which is which as long as I can chain them together in a relatively predictable manner. rn I’m experimenting with
Copy code
from functools import wraps
from prefect import flow, task


def proxyflow(func):
    @task
    @wraps(func)
    async def wrapper(*args, **kwargs):
        return await func(*args, **kwargs)

    return wrapper


def flowtask(func=None, **flow_kwargs):
    def decorator(func):
        @proxyflow
        @flow(**flow_kwargs)
        @wraps(func)
        async def wrapper(*args, **kwargs):
            return await func(*args, **kwargs)

        return wrapper

    if func is None:
        return decorator
    else:
        return decorator(func)
and just using flowtask for everything but the starting flow and end tasks — it saves a lot of headaches
n
I’ve found moments where chaining flows would run everything sequentially, then in parallel if I wrapped the flows in tasks…
hm id be curious to see an example of this. both should behave like python functions do in general, with the exception that task has the
submit
/
map
interface as syntactic sugar for concurrency
what are you trying to accomplish with a composite decorator?
g
eg this runs the
count_to_proxy
sequentially
Copy code
import asyncio
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner
import ray

rtr = RayTaskRunner()


@task
async def shout(number):
    time.sleep(5)
    print(f"#{number}")


@flow(task_runner=rtr, log_prints=True)
async def count_to(highest_number):
    results = [shout.submit(number) for number in range(highest_number)]
    return results


@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to_proxy(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
This breaks:
Copy code
import asyncio
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner


@task
async def shout(number):
    asyncio.sleep(5)
    print(f"#{number}")


@task
async def count_to(highest_number):
    # This doesn't work
    results = [shout.submit(number) for number in range(highest_number)]
    results[-1].result()

    # This works
    # results = await asyncio.gather(*[shout(number) for number in range(highest_number)])

    return results


@flow(task_runner=RayTaskRunner, log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to.submit(highest_number) for _ in range(times)]
    return results


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
error:
Copy code
(count_to pid=30383) 16:55:32.565 | INFO    | Task run 'count_to-c85a8294-33ca-4745-9d17-244a8b5e8b2f' - Created task run 'count_to-c85a8294-33ca-4745-9d17-244a8b5e8b2f' for task 'count_to'
(count_to pid=30387) 16:55:32.836 | ERROR   | Task run 'count_to-0fc46acd-5df6-4520-915b-1f20ffd47f75' - Task run failed with exception: RuntimeError('The task runner must be started before submitting work.') - Retries are exhausted
n
@Gabriel Lespérance what is prefect version here?
g
using the 3.0 rc
thank you 1
didnt have much luck w 2.x
however, this works as expected:
Copy code
import asyncio
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
async def shout(number):
    asyncio.sleep(5)
    print(f"#{number}")


@flowtask
async def count_to(highest_number):
    # This doesn't work
    results = [shout.submit(number) for number in range(highest_number)]
    results[-1].result()

    # This works
    # results = await asyncio.gather(*[shout(number) for number in range(highest_number)])

    return results


@flow(task_runner=RayTaskRunner, log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to.submit(highest_number) for _ in range(times)]
    return results


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
where flowtask is defined as in my prev message
n
the engine has been rewritten at least once between that issue and the RC 🙂
Copy code
time.sleep(5)
worth noting this will always block the event loop but sorry if im missing some nuance but why all the misdirection / nesting?
Copy code
from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@task
def count_to(highest_number):
    futures = shout.map([i for i in range(highest_number)])
    return [f.result() for f in futures]


@flow(log_prints=True)
def many_flows(highest_number: int, times: int):
    futures = count_to.map([highest_number] * times)
    return [f.result() for f in futures]


if __name__ == "__main__":
    many_flows(10, 3)
g
yeah except — it doesnt work the moment I decide to upgrade to a ray task runner:
Copy code
import asyncio

from prefect import flow, task

from prefect_ray import RayTaskRunner


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@task
def count_to(highest_number):
    futures = shout.map([i for i in range(highest_number)])
    return [f.result() for f in futures]


@flow(log_prints=True, task_runner=RayTaskRunner())
async def many_flows(highest_number: int, times: int):
    futures = count_to.map([highest_number] * times)
    return [f.result() for f in futures]


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
n
okay. this seems like an issue with the ray task runner then (and likely dask as well)
would you mind opening an issue?
g
I also don,t always know when I need to use submit vs map so for example this would have been my goto:
Copy code
import asyncio

from prefect import flow, task

from prefect_ray import RayTaskRunner


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@task
def count_to(highest_number):
    futures = [shout.submit(i) for i in range(highest_number)]
    return futures


@flow(
    log_prints=True,
)
async def many_flows(highest_number: int, times: int):
    futures = [count_to.submit(highest_number) for _ in range(times)]
    return futures


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
but it breaks
would you mind opening an issue?
sure
🙏 1
n
can you share the error?
Copy code
async def many_flows(highest_number: int, times: int):
    futures = [count_to.submit(highest_number) for _ in range(times)]
    return futures
> but it breaks i would imagine its because the futures are not being resolved, i.e.
.result()
being called on them
g
so all in all — prefect is rn my #1 choice vs going to something like airflow, or dagster. but its pretty unclear when I need to use what — ie when I should be using .result, or when just returning the future from submit is okay
when .map is better
is there a way to wait on all the futures at the same time or do I need to iterate like in your code
n
yeah we have a convenience method you can call on the list of futures (wait or result)
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def foo(_):
   ...:     return 42
   ...:

In [3]: @flow
   ...: def bar():
   ...:     assert foo.map(range(10)).result() == [42] * 10
we also have docs on this here
but its pretty unclear when I need to use what — ie when I should be using .result, or when just returning the future from submit is okay
feel free to let us know if anything is glaringly missing
g
cold you tell me whats the deal with just calling the async methods too? it seems like it sorta works in getting the tasks logged… but that overall im better in calling .map and .submit
n
cold you tell me whats the deal with just calling the async methods too?
sorry, im not sure I understand the question
g
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@flow(log_prints=True)
async def count_to(highest_number):
    results = [shout.submit(number) for number in range(highest_number)]
    return results


@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to_proxy(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
this also runs sequentially
n
just talking through what I see above •
count_to
doesn't resolve its futures, and neither do either of the parents, so we will throw to warn about that. • if
count_to
did resolve its futures with
list_of_futures.result()
then it would be necessarily blocking, and therefore superficially async the below is concurrent in the way you seem to want, although it seems we have some issue to root out with ray / dask
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@task
def count_to(highest_number):
    return (shout.map(range(highest_number))).result()


@flow(log_prints=True)
def many_flows(highest_number: int, times: int):
    return (count_to.map([highest_number] * times)).result()


if __name__ == "__main__":
    many_flows(10, 3)
g
n
thank you!
g
@Nate from the way I see you use the framework you rarely use asyncio.create_task or anything related to async await
so you always go with map / submit and then get the .result
n
not necessarily. that just seemed like the most direct way to accomplish what it looked like your code was trying to do you're free to ditch the prefect syntax sugar and use async / await + for loops
Copy code
import asyncio

import aiohttp

from prefect import flow, task


@task
async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()


@task
async def transform_data(data):
    for item in data:
        yield item["id"], item["title"].upper()


@task
async def load_data(transformed_data):
    async for id, title in transformed_data:
        print(f"Loaded: {id} - {title}")


@flow
async def etl_pipeline(urls):
    for url in urls:
        data = await fetch_data(url)
        transformed = transform_data(data)
        await load_data(transformed)


if __name__ == "__main__":
    urls = [
        "<https://jsonplaceholder.typicode.com/posts>",
        "<https://jsonplaceholder.typicode.com/todos>",
    ]

    asyncio.run(etl_pipeline(urls))
g
Right thats what I understood as well from the example / doc:
However, asynchronous nested flows can run concurrently with AnyIO task groups or asyncio.gather.
ref: https://docs-3.prefect.io/3.0rc/develop/write-flows#asynchronous-functions
Copy code
import asyncio
from prefect import task, flow


@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1)
        print(value, end=" ")


@flow
async def async_flow():
    print("Hello, I'm an async flow")

    # runs immediately
    await print_values([1, 2])  

    # runs concurrently
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(async_flow())
so why is my async code example not running async?
Copy code
@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to_proxy(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)
^ this is clearly not blocking
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@flow(log_prints=True)
async def count_to(highest_number):
    results = [shout.submit(number) for number in range(highest_number)]
    return results
and neither is this
n
count_to
doesn't resolve its futures, and neither do either of the parents, so we will throw to warn about that (ie it breaks)
• if
count_to
did resolve its futures with
list_of_futures.result()
then it would be necessarily blocking, and therefore superficially async
I think I addressed this here, let me know if you think that's not the case
g
thanks for that — i guess it would help to clarify: it sounds like not resolving the futures makes it _ Not _ async behind the scenes?
n
> it sounds like not resolving the futures makes it _ Not _ async behind the scenes? I would say: • you should always resolve your futures if you want to make sure the work happens • opting into
submit
and
map
is sort of opting out of traditional async / await (don't hold me to this exact language, but we could make something like this more clear in the docs), since under the hood its basically a
ThreadPoolExecutor
(or dask / ray) that "does the concurrency" for you, leaving you as the user with an all sync interface to
submit
wait
or get `result`s from your work so if you wanna go full asyncio you should be able to do this
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")


@flow
async def count_to(highest_number):
    return await asyncio.gather(*[shout(i) for i in range(highest_number)])


@task
async def count_to_proxy(highest_number):
    return await count_to(highest_number)


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    results = [count_to_proxy(highest_number) for _ in range(times)]
    return await asyncio.gather(*results)


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
ie swap the use of the task runner in
count_to
for traditional
asyncio
g
^^ ah yeah
so we have a potential bug when asyncio + task-runners interact
b/c this is still sequential:
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")
    return number


@flow(log_prints=True)
async def count_to(highest_number):
    futures = [shout.submit(number) for number in range(highest_number)]
    results = [future.result() for future in futures]
    return results


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    futures = [count_to(highest_number) for _ in range(times)]
    results = await asyncio.gather(*futures)
    print(f"ALL DONE: {results}")
    return results


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
n
> so we have a potential bug when asyncio + task-runners interact possibly! i'm not convinced just yet
Copy code
results = [future.result() for future in futures]
since, this line is blocking
g
right — so removing the line we have
Copy code
import asyncio

from prefect import flow, task


@task
async def shout(number):
    await asyncio.sleep(5)
    print(f"#{number}")
    return number


@flow(log_prints=True)
async def count_to(highest_number):
    futures = [shout.submit(number) for number in range(highest_number)]
    return futures


@flow(log_prints=True)
async def many_flows(highest_number: int, times: int):
    futures = [count_to(highest_number) for _ in range(times)]
    results = await asyncio.gather(*futures)
    print(f"ALL DONE: {results}")
    return results


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))
which is sequential
so my guess is that the flow, upon beign returned futures, will perform a wait on it and block
this blocks the whole process
n
yeah that sounds right
g
Overall what I got is: 1. always resolve your futures when using .submit / .map 2. asyncio work as in python BUT using asynci doesn’t work w task runners 3. we can always call a subflow from another flow — but rn there’s a bug in ray that makes it impossible 4. returning a prefect future within a flow will cause unpexected magic to happen — this can block and perform weirdly. Remember #1 and always resolve your prefect futures
I’d argue #4 should yell at me / completely break — given that 1 is whats the good practice
also you mentioned > •
count_to
doesn’t resolve its futures, and neither do either of the parents, so we will throw to warn about that (ie it breaks) I never saw anything like that so it might be failling silently
n
1. yes! 2. I would say generally asyncio / task runners are alternative ways to achieve concurrent work.
submit
/
map
is easy mode for our many users who have no interest in learning ins and outs of
asyncio
/
anyio
/ thread pools 3. yes! we will look into the ray problem. you may also be interested in the less easy mode client option (bottom) 4. yeah, unfortunately this is how it worked in prefect 2 so to avoid breaking tons of user code we decided to stick there for now, which I think i forgot we reintroduced! my bad. so this should say > •
count_to
doesn’t resolve its futures, and neither do either of the parents, so when we return from teh flow we will implicitly wait for them ill also point out that this exists, a sort of intermediate option if you already know dask
thanks for all the feedback by the way!
g
yeah nw ! Thank you for clarifying it helps a lot.
@Nate I guess one last question: is there a way to use task-runners (ie submit / map) without blocking? ie I’d like an await .result
n
so submission (submit/map) technically speaking is always non-blocking bc we're just creating the futures, its the future resolution thats typically blocking but we do have an
as_completed
utility
Copy code
In [1]: from prefect import task, flow

In [2]: import asyncio

In [3]: from prefect.futures import as_completed

In [4]: @task
   ...: async def f(t):
   ...:     await asyncio.sleep(t)
   ...:     return t
   ...:

In [5]: @flow
   ...: def g():
   ...:     for fut in as_completed(f.map(range(10))):
   ...:         print(fut.result())
i think it would also be possible for use to expose an async version of
result()
as well, but i dont believe we do now
g
oh nice that as_completed will come in handy.ty I managed to move to full async with submit / map with just returning the futures
Copy code
import asyncio
import time

from prefect import flow, task

from prefect.task_runners import ConcurrentTaskRunner

from prefect_flowtask import flowtask


@task(log_prints=True)
async def shout(number, _):
    time.sleep(5)
    print(f"#{number}")
    return number


@task()
async def count_to(highest_number):
    futures = []
    for number in range(highest_number):
        futures.append(shout.submit(number, futures[-1] if futures else None))
    return futures


@flow(task_runner=ConcurrentTaskRunner())
async def many_flows(highest_number: int, times: int):
    futures = await asyncio.gather(*[count_to(highest_number) for _ in range(times)])
    print(f"ALL DONE: {[f.result() for l in futures for f in l]}")


if __name__ == "__main__":
    asyncio.run(many_flows(10, 3))