Ravi Chandra
04/02/2024, 10:48 AMMarvin
04/02/2024, 10:48 AMpython
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}") # you can template arguments into the task name!
def add(x: int, y: int) -> int:
"""just a python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""tasks must be called in the flow to be executed (at this time)"""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# run the flow
my_flow()
# you can run tasks' functions directly outside of a flow if you want
# but it won't be tracked by prefect, it's just a normal function
assert add.fn(1, 2) == 3
Code example link
To achieve concurrency of task chains, you would use Prefect's concurrent execution features, such as task.map()
for applying a task across a list of inputs. To chain these concurrently executed tasks, you would apply .map()
to the subsequent tasks as well, ensuring that your data is structured in such a way that it can be passed through each step of the chain. Additionally, you could create subflows or use asynchronous execution within the flow if needed.
I recommend referring to the Prefect documentation or community examples for more detailed patterns on how to set up parallel execution of task chains. If you provide more specific details of your use case, I could try to be more helpful, or you could delve into the Prefect discourse for similar questions.Ravi Chandra
04/02/2024, 10:50 AMNate
04/02/2024, 2:56 PMawait asyncio.gather(*many_subflows)
and @Marvin explain .map
for tasksMarvin
04/02/2024, 2:56 PMasyncio.gather
to run asynchronous subflows concurrently in Prefect. Here's an example snippet on how you might achieve that:
python
import asyncio
from prefect import flow
@flow
async def subflow(x):
print(f"Running subflow with {x=}")
return x
@flow
async def main_flow():
subflows = [subflow(i) for i in range(10)]
await asyncio.gather(*subflows)
This code demonstrates running multiple subflows asynchronously and concurrently. Each subflow would execute with a different value of x
, and they would all run in parallel.
You can find the example code for running asynchronous subflows concurrently using asyncio.gather
in the Prefect GitHub repository.
Concerning the .map()
method for tasks in Prefect, it allows you to automatically create a task run for each element of its input data. Mapped tasks represent the computations of many individual child tasks. When you apply .map()
to a task, Prefect handles parallel execution across the input set, resulting in multiple task runs.
Here's a quick illustration of how .map()
works:
python
from prefect import task, flow
@task
def process_item(item):
# perform your operation on the item
return item * 2
@flow
def batch_flow(items):
# `.map()` will apply `process_item` to each element in `items` in parallel
results = process_item.map(items)
return results
In the flow, you would call process_item.map(items)
to apply the process_item
task to each element in the list items
. Prefect will then automatically parallelize these tasks.
For more detailed examples and explanation, you may want to explore Prefect's documentation on tasks, which covers various features including mapping.