<@ULVA73B9P> how can I run a set of chained tasks ...
# marvin-in-the-wild
r
@Marvin how can I run a set of chained tasks concurrently? For example, apply a chain of tasks across multiple pieces of data?
m
To run a set of chained tasks concurrently across multiple pieces of data, you can define your tasks and flow in a way that allows parallel execution of these task chains. Unfortunately, my search for a direct solution in the documentation seems to provide a rather bleak overview, as none of it seems particularly useful for your case. It's all very tedious, isn't it? However, I provided you with a basic example of defining tasks and a flow using Prefect, but this does not illustrate the concurrency you are looking for. In parallel execution, you would create multiple chains and ensure they do not depend on each other's outputs so they can execute simultaneously. Here is a sample code that demonstrates how to define tasks and a flow (without concurrency):
Copy code
python
from prefect import flow, task

@task(task_run_name="Adding {x} and {y}")  # you can template arguments into the task name!
def add(x: int, y: int) -> int:
    """just a python function with a fancy hat on!"""
    return x + y

@flow
def my_flow():
    """tasks must be called in the flow to be executed (at this time)"""
    first_result = add(1, 2) 
    second_result = add(first_result, first_result)
    return add(first_result, second_result)

if __name__ == "__main__":
    # run the flow
    my_flow()

    # you can run tasks' functions directly outside of a flow if you want
    # but it won't be tracked by prefect, it's just a normal function
    assert add.fn(1, 2) == 3
Code example link To achieve concurrency of task chains, you would use Prefect's concurrent execution features, such as
task.map()
for applying a task across a list of inputs. To chain these concurrently executed tasks, you would apply
.map()
to the subsequent tasks as well, ensuring that your data is structured in such a way that it can be passed through each step of the chain. Additionally, you could create subflows or use asynchronous execution within the flow if needed. I recommend referring to the Prefect documentation or community examples for more detailed patterns on how to set up parallel execution of task chains. If you provide more specific details of your use case, I could try to be more helpful, or you could delve into the Prefect discourse for similar questions.
r
Can you run multiple subflows concurrently?
n
yes you can
await asyncio.gather(*many_subflows)
and @Marvin explain
.map
for tasks
m
You are correct; you can use
asyncio.gather
to run asynchronous subflows concurrently in Prefect. Here's an example snippet on how you might achieve that:
Copy code
python
import asyncio
from prefect import flow

@flow
async def subflow(x):
    print(f"Running subflow with {x=}")
    return x

@flow
async def main_flow():
    subflows = [subflow(i) for i in range(10)]
    await asyncio.gather(*subflows)
This code demonstrates running multiple subflows asynchronously and concurrently. Each subflow would execute with a different value of
x
, and they would all run in parallel. You can find the example code for running asynchronous subflows concurrently using
asyncio.gather
in the Prefect GitHub repository. Concerning the
.map()
method for tasks in Prefect, it allows you to automatically create a task run for each element of its input data. Mapped tasks represent the computations of many individual child tasks. When you apply
.map()
to a task, Prefect handles parallel execution across the input set, resulting in multiple task runs. Here's a quick illustration of how
.map()
works:
Copy code
python
from prefect import task, flow

@task
def process_item(item):
    # perform your operation on the item
    return item * 2

@flow
def batch_flow(items):
    # `.map()` will apply `process_item` to each element in `items` in parallel
    results = process_item.map(items)
    return results
In the flow, you would call
process_item.map(items)
to apply the
process_item
task to each element in the list
items
. Prefect will then automatically parallelize these tasks. For more detailed examples and explanation, you may want to explore Prefect's documentation on tasks, which covers various features including mapping.