Y'all, I'm feeling a bit slow. In my mind, especia...
# ask-community
d
Y'all, I'm feeling a bit slow. In my mind, especially with Prefect 3, using native python `async`/`await` seems to be the way to go, especially for genuinely async tasks. However, I cannot figure out how to optimally schedule them. Here's a real example of something. You'll note that there's something weird suboptimal about the way it is written.
b
depends on all of the
a
tasks, but
c
could be running completely independently.
Copy code
async def main():
    """Do something."""

    a_1 = task_a_n(n=1)
    a_2 = task_a_n(n=2)
    a_3 = task_a_n(n=3)
    a = await asyncio.gather(a_1, a_2, a_3)
    b = await task_b(needs=a) # note that this does not actually take a as input, but I'm just writing it here for clarity on dependencies

    c = await task_c()

    # Finally, message passing
    await task_d(needs=(b, c))
Using Prefect magic that seems to be going away, we could write:
Copy code
async def main():
    """Do something."""

    a_1 = task_a_n(n=1)
    a_2 = task_a_n(n=2)
    a_3 = task_a_n(n=3)
    b = task_b(wait_for=[a_1, a_2, a_3])

    c = task_c()

    # Finally, message passing
    await task_d(wait_for=[b, c])
What's the canonical or pythonic way to correctly represent these complex/transitive dependencies without doing something ugly?
j
a_1, a_2, a_3 = task_a_n.map([1,2,3])
perhaps? https://docs.prefect.io/v3/develop/task-runners#mapping-over-iterables
d
Ah I should've actually been more precise:
Copy code
async def main():
    """Do something."""

    a_1 = task_a_n.submit(n=1)
    a_2 = task_a_n.submit(n=2)
    a_3 = task_a_n.submit(n=3)
    b = task_b.submit(wait_for=[a_1, a_2, a_3])

    c = task_c.submit()

    # Finally, message passing
    await task_d.submit(wait_for=[b, c])
I'm using the
.submit
system to dispatch tasks, including
async
ones. Maybe that's totally allowed?
j
I haven't tried it, but the docs seem to imply that's fine to do. One thing you're doing is managing your dependencies manually with "wait_for" -- ideally you would let prefect manage the dependencies by passing the return values from
task_a_n
to
task_b
so that prefect is in charge of managing the dependency order. You don't wait to
await task_d.submit
on the last line, you want to use prefect's awaiter
from prefect.futures import wait
or
resolve_futures_to_states
as submit returns a PrefectFuture https://prefect-python-sdk-docs.netlify.app/prefect/futures/
d
I could artificially expose return values, but we do have some very side-effecty tasks that do something in the background without returning a value. Good catch on the last line there: I will switch to a
wait
call.
j
Be careful with managing background tasks in prefect 3 manually. A failed task doesn't automatically fail the flow. Read this section in careful detail. https://docs.prefect.io/v3/resources/upgrade-to-prefect-3#flow-final-states
d
Hmmm interesting -- if we don't suppress with
raise_on_failure=False
, will background tasks correctly cause the flow to fail?
btw: thanks for the quick responses + help!
Ohhhhh I just found this thread: https://github.com/PrefectHQ/prefect/issues/15008
^ that issue captures a lot of my confusion haha
j
I'd run a quick test flow to make sure errors are handled as expected. In my own code, I'm using future.result() to throw any exception raised by the task. Do your task_a_n's normally throw exceptions to indicate failure?
d
Yes they throw exceptions on failures
I also haven't upgraded to Prefect 3
I'm working on doing that right now and so am trying to understand the nuances/distinctions with
async
tasks to make sure the upgrade doesn't break everything 🙂
j
I was going the other way -- adding more async tasks to a data pipeline that was very synchronous, so my experience isn't quite the same as what you're facing. Like I said, I'd run a quick test program just to make sure you're getting what you expect as far as error handling goes.
❤️ 2