Hi <@ULVA73B9P>, if I wanted to use the Concurrent...
# ask-marvin
m
Hi @Marvin, if I wanted to use the ConcurrentTaskRunner to concurrently run a series of tasks that each run a series of strictly sequential tasks, how would I best do this?
m
To run a series of tasks concurrently using the
ConcurrentTaskRunner
(which is an alias for
ThreadPoolTaskRunner
), and within each of these tasks run a series of strictly sequential tasks, you can structure your Prefect flow as follows: 1. Define your sequential tasks: Use the
@task
decorator to define each task. Ensure these tasks are executed sequentially by calling them in order within another task or flow. 2. Concurrently run tasks: Use the
ConcurrentTaskRunner
to execute these tasks concurrently. This is achieved by setting it as the task runner for your flow. Here's a basic example:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def sequential_task_1():
    # Your logic here
    pass

@task
def sequential_task_2():
    # Your logic here
    pass

@task
def concurrent_task():
    sequential_task_1()
    sequential_task_2()

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    # Executing multiple concurrent tasks
    for _ in range(5):  # Adjust the range for the number of concurrent tasks
        concurrent_task()

if __name__ == "__main__":
    my_flow()
### Key Points: - Sequential Execution: Within
concurrent_task
, tasks
sequential_task_1
and
sequential_task_2
are executed in order. - Concurrent Execution: Multiple instances of
concurrent_task
run concurrently using
ThreadPoolTaskRunner
. For more details, refer to the task runners documentation.
m
@Marvin what if I didn't want to explicitly call the sequential tasks, but use some sort of iterator?
m
To dynamically execute a series of tasks using an iterator, you can leverage Prefect's capabilities to create tasks and flows programmatically. This approach allows you to iterate over a list of tasks and execute them in a sequence without manually calling each one. Here's how you can achieve this: 1. Define a list of tasks: Create a list or any iterable containing the tasks you wish to execute sequentially. 2. Iterate and execute: Use a loop within a task or flow to iterate over this list and execute each task. Here's an example implementation:
Copy code
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def task_a():
    # Task logic for A
    pass

@task
def task_b():
    # Task logic for B
    pass

@task
def task_c():
    # Task logic for C
    pass

@task
def execute_tasks_sequentially(tasks):
    for task in tasks:
        task()

@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
    tasks = [task_a, task_b, task_c]
    execute_tasks_sequentially(tasks)

if __name__ == "__main__":
    my_flow()
### Key Points: - Dynamic Task Execution: The
execute_tasks_sequentially
task iterates over the list
tasks
and executes each task in order. - Concurrent Execution: The
ThreadPoolTaskRunner
allows multiple instances of this sequential execution to run concurrently if desired. This approach provides flexibility in managing and executing tasks dynamically without explicit calls for each task. For more advanced usage, you might consider exploring Prefect's orchestration capabilities for dynamic task creation and execution.
n
i think you might be looking for .map, @Marvin make a .map example
🙌 1
m
To use Prefect's
.map
method for dynamically executing tasks with an iterator, you can leverage the
@task
decorator to create tasks that can be mapped over. Here's an example of how to use
.map
to iterate over tasks:
Copy code
python
from prefect import flow, task, unmapped

@task
def add_n_to_items(items, n):
    return [item + n for item in items]

@flow
def my_flow():
    # Use .map to execute the task over a range of inputs
    results = add_n_to_items.map(unmapped([10, 20]), n=[1, 2, 3])
    return results

if __name__ == "__main__":
    my_flow()
### Explanation: - Mapping over tasks: The
add_n_to_items.map()
method allows you to dynamically map the task over the iterable
n=[1, 2, 3]
. - Unmapped arguments: The
unmapped()
function is used to treat an iterable argument as a constant for all mapped calls, so
[10, 20]
is used consistently across all iterations. - Output: The flow will return a list of results for each mapping, like
[[11, 21], [12, 22], [13, 23]]
. This approach is ideal for executing tasks over a set of inputs dynamically. You can find more details in the Prefect tasks documentation.
n
hrm i didn't notice "strictly sequential" until after i commented
why do you want the concurrent task runner to run things in series?
m
Yes, it might just be that I've created a structural scenario that is worth taking back to the drawing board honestly. Effectively I want to run a series of flows defined using the same function that will process data for a range of different datasets using different input I want to iterate through. I'd like to be able to do this in parallel. For each of those datasets, I might want to process any number of dates of data but as individual dates, due to memory constraints. The issue is that part of the process involves updating the aws glue data catalog, which cannot be altered concurrently (and if you get unlucky there doesn't seem to be a particularly easy way to handle these locks without just spamming retries, which isn't practical). Right now then, at the top level I'm looping through the datasets and then at the lower level I am looping through dates, with that lower level running into these glue concurrency issues. Sorry about the long explanation, but any thoughts appreciated! 🙂
(so it's not really strictly sequential, a more loose solution works if it could ensure I'm not going to hit the glue update atthe same time)
Looks like I've fixed it, seems to have been a problem with how prefect futures were being used to track dependencies between the dataset runs rather than the runner. Thanks for the help both!
👍 1