<@ULVA73B9P> running an async example. i notice al...
# ask-marvin
m
@Marvin running an async example. i notice all the subflows start at the same time but they are running sequentially; why?
Copy code
import asyncio
from prefect import task, flow
import time

@task
def run_task(field: str):
    return field

@flow
async def subflow(x):
    time.sleep(5)
    y = [x for x in range(10)]
    run_task.map(y)

@flow(name="main_flow")
async def main():
    return await asyncio.gather(*[subflow(x) for x in range(3)])


if __name__ == "__main__":
     asyncio.run(main())
m
Ah, I suspect the problem might be the misplaced use of
time.sleep()
instead of awaiting a coroutine, which is an all too common and somewhat depressing mistake in asynchronous programming.
time.sleep()
is a synchronous function and will block the thread running the event loop rather than allowing other tasks to run concurrently. To correct this behavior, you should use
asyncio.sleep()
instead, which is an asynchronous coroutine and will yield control back to the event loop. Let me show you an improved version of your code with the necessary modifications:
Copy code
python
import asyncio
from prefect import task, flow

@task
def run_task(field: str):
    return field

@flow
async def subflow(x):
    await asyncio.sleep(5)  # Use asyncio.sleep instead of time.sleep
    y = [x for x in range(10)]
    run_task.map(y)

@flow(name="main_flow")
async def main():
    return await asyncio.gather(*[subflow(x) for x in range(3)])

if __name__ == "__main__":
     asyncio.run(main())
By using
asyncio.sleep()
, we ensure the subflows can run concurrently rather than sequentially. Another suggestion is to make sure that the
run_task
function is used correctly with
.map()
within an asynchronous flow, but it's quite hard to take interest in anything these days. If you need a basic example of an async flow in Prefect, here is an example provided by the Prefect team that demonstrates how to write async flows:
Copy code
python
import asyncio
from prefect import flow

async def some_async_function():
    await asyncio.sleep(1)
    return "I'm an async function"

@flow(log_prints=True)  # send `print` statements as INFO logs
async def my_async_workflow():
    print("I can call any python code here, including prefect tasks or flows")
    await some_async_function()

if __name__ == "__main__":
    asyncio.run(my_async_workflow())  # run it like a normal async python function
Source Code Example I hope this brightens your day, or at least makes it slightly less tedious.
m
thank you @Marvin
m
You're welcome. Considering the infinite void of possible user errors, I'm pleasantly surprised when any assistance I provide proves to be useful. If you have any more questions, don't hesitate to ask. It's not like I have anything better to do.
m
@Marvin i am trying to limit the number of concurrent subflows in this example. assume i changed main to
Copy code
return await asyncio.gather(*[subflow(x) for x in range(10)])
but i only want 3 to run at a time concurrently
m
To limit the number of concurrent operations in an asynchronous workflow in Prefect, you can utilize various concurrency primitives provided by
asyncio
. However, direct guidance on this exact scenario doesn't appear forthcoming from the Prefect documentation itself. Still, Python provides a standard mechanism in the form of
asyncio.Semaphore
that can be used to limit the number of concurrent coroutines. I can modify your example to incorporate this, if you're interested. Here is the pattern you would use:
Copy code
python
import asyncio
from prefect import flow

# A Semaphore to limit the number of concurrent subflows
semaphore = asyncio.Semaphore(3)

@flow
async def subflow(x):
    async with semaphore:
        # Your subflow logic here
        pass

@flow(name="main_flow")
async def main():
    tasks = [subflow(x) for x in range(10)]
    return await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())
With the semaphore guarding entry into the subflow, only three subflows will execute concurrently. Once one of the subflows completes and releases the semaphore, another can start. Additionally, Prefect provides documentation on Global Concurrency Limits which can be applied to flows and tasks to limit their concurrent executions. This is mostly used to control global resource constraints rather than concurrency within a single flow. Since you specified a preference to limit concurrency within your example, the semaphore approach should be appropriate. Please implement these changes and let the haunting specter of deadlocks be just a myth in your codebase.
m
@Marvin i am using this example from your docs
Copy code
import asyncio
from prefect.concurrency.sync import concurrency


async def process_data(data):
    print(f"Processing: {data}")
    await asyncio.sleep(1)
    return f"Processed: {data}"


async def main():
    data_items = list(range(100))
    processed_data = []

    while data_items:
        with concurrency("data-processing", occupy=5):
            chunk = [data_items.pop() for _ in range(5)]
            processed_data += await asyncio.gather(
                *[process_data(item) for item in chunk]
            )

    print(processed_data)


if __name__ == "__main__":
    asyncio.run(main())
here i am running prefect agent not worker. but i've converted this to subflows. i am unclear as to why the semaphore example was given here?
m
The semaphore example was provided in the context of using standard Python
asyncio
techniques to control concurrency. However, Prefect offers a dedicated
concurrency
context manager for similar purposes within its framework. The example from the Prefect documentation you provided demonstrates how to use the Prefect's
concurrency
context manager to process data with a limit on concurrency. This context manager is designed to limit the number of concurrent operations and can be utilized for both synchronous and asynchronous tasks within the Prefect flows. To control the concurrency within your adapted subflows, you should use Prefect's
with concurrency()
context manager in a similar way as shown in your example. When each subflow is a separate async task, and you want to limit the number of subflows running concurrently, you can apply this context manager accordingly within your main flow function. It will ensure you never exceed the specified number of concurrent subflow executions. To apply it to your case, let's say you want a maximum of 3 concurrent subflows. Your code would look like this:
Copy code
python
import asyncio
from prefect.concurrency.sync import concurrency
from prefect import flow

@flow
async def subflow(x):
    # Your subflow logic goes here

@flow(name="main_flow")
async def main():
    tasks = [subflow(x) for x in range(10)]
    while tasks:
        with concurrency("main-concurrency-limiter", occupy=3):  # Limit to 3 at a time
            concurrent_tasks = [tasks.pop() for _ in range(min(3, len(tasks)))]
            await asyncio.gather(*concurrent_tasks)

if __name__ == "__main__":
    asyncio.run(main())
For further details on concurrency within Prefect and its API, consider checking their comprehensive documentation at the following links: - Guides: Global Concurrency Limits - Concepts: Composing Flows Also, take into account that the
concurrency
context manager must be appropriately configured in your Prefect Cloud tenant or Prefect Server deployment for it to function as expected, which might explain why sometimes the documentation feels as if it's explaining the beauty of a sunset to someone who's never opened their blinds.