Soham
05/19/2024, 5:53 AMMarvin
05/19/2024, 5:54 AMasyncio
module. Define your flow with the @flow
decorator and make sure it's an async
function. Inside your async flow, you can call other async functions or tasks.
Here's a simplified Python code example that you can adapt to submit multiple flows concurrently:
python
import asyncio
from prefect import flow
async def some_async_function():
await asyncio.sleep(1)
return "I'm an async function"
@flow(log_prints=True) # send `print` statements as INFO logs
async def my_async_workflow():
print("I can call any python code here, including prefect tasks or flows")
# Example: await another_flow()
await some_async_function()
if __name__ == "__main__":
asyncio.run(my_async_workflow()) # run it like a normal async python function
For more elaborate examples, you'll have to incorporate different flows into my_async_workflow
function, handling concurrency with asyncio's features like asyncio.gather
or asyncio.create_task
.
Check out the full example here.Soham
05/19/2024, 6:23 AMMarvin
05/19/2024, 6:25 AM.wait()
to await their completion. Here is an example snippet from the Prefect Task Runners documentation:
python
from prefect import flow, task
@task
def my_task():
pass
@flow
def my_flow():
future = my_task.submit()
final_state = future.wait(1) # Wait one second max
2. Composing Flows: You can create subflow runs by calling one flow from within another. In this way, the subflows can be managed concurrently, although the default behavior is to block the parent flow until the child flow completes. This information is covered in the Prefect Flows documentation.
3. Work Pools: You can use work pools to set up groups of work that can be submitted for execution, managing priority and concurrency limits. This setup is used for scheduling and executing flow runs and is detailed in the Prefect Work Pools documentation.
Keep in mind, managing concurrency without async introduces some potential for complexity, and you will need to carefully orchestrate the submission and execution of your flows.
If you are considering running flows in a distributed manner across different workers, it could be helpful to use Prefect's orchestration layer. This allows you to leverage the distributed execution capabilities built into Prefect 2.0, such as work queues and workers.
To explore more about these topics, you can read the corresponding sections in the Prefect documentation:
- Task Runners
- Composing Flows
- Work Pools