Hello, I am pretty new with prefect so if this has...
# ask-community
d
Hello, I am pretty new with prefect so if this has been answered before I apologize, I couldn't find a solution for my issue anywhere. I have a task that returns a nested list of IDs, something like this:
Copy code
groups = [
  [1, 3, 5],
  [2, 4, 6],
]
I need to process each ID individually, the problem is that each group of IDs can be parallelized but each group in the list depends on the previous group being fully processed first, like:
Copy code
for group in groups:
  process_id.map(group) # `process_id` has side effects that affect the result of the next group of IDs
is there a way to process the groups in such a way that the each group is parallelized though
map
but the groups themselves are processed sequentially?
e
As far as I know, you could create two root nodes. Prefect knows how to pass it to executor. Correct me if I understand wrong.
Copy code
flow = (xxx, tasks=[ group_handler_task(group) for group in groups]
Something like this
Copy code
@task
def hello_task(x):
    logger = prefect.context.get("logger")
    sleep(x)
    <http://logger.info|logger.info>("Hello world {}".format(x))


with Flow("docker-example 3", executor=executor) as flow:
    [hello_task(x) for x in range(20)]
Verified, 20 jobs are submitted to executor immediately because they are all considered as root node in the DAG
d
thank you for the reply, however I am not sure how to use this for my use case, which is a bit closer to this:
Copy code
@task
def get_id_groups_from_server() -> List:
    # this task gets groups of IDs from the server, so prefect can build a hierarchical graph
    # the result would be something like:
    return [[1, 3, 5], [2, 4, 6]] # group 0 are the children of objects in group 1 and so on


@task
def set_node_size(node):
    # the size of all the children need to be computed at this point
    if len(node.children) > 0:
        node.size = reduce(lambda x, y: x + y.size, node.children, 0)
    else:
        node.size = 1


with Flow("docker-example 3", executor=executor) as flow:
    groups = get_id_groups()
    processed_groups = ... # some other tasks that process the IDs into objects

    # here's the problem, each entry in `processed_groups` is a list of all the children
    # at the next entry of the array, so computing the size of nodes in `processed_groups[1]`
    # requires that the nodes in `processed_groups[0]` have their sizes correctly assigned
    network = set_node_size.map(processed_groups)
hopefully that makes sense
e
Wait for prefect stuffs to answer the question. However, instead of using functional constructor, you can directly build the DAG with
edge
. e.g for
Copy code
for group in groups:
    group.set_upstream(another_task)
d
I tried that, something like:
Copy code
class DepTask(Task):
  def run(self, groups):
    r = list()
    for group in groups:
      t = set_node_size.map(group, flow=flow)
      self.set_upstream(t, flow=flow)
      r.append(t)
    return r;
the problem with that is that
r
doesn't get resolved before the next task and the next task gets a list of tasks as a parameter
I thought that maybe I am missing a place where I need to make the next task in my process a upstream dependency of the
set_node_size
tasks
but I can't find where
k
Hey @dario, I’ll double-check but I don’t think this can be done for the reasons you described. The deferred execution means the grouping only comes out at runtime so you can’t loop through it. You can’t use map either in another task (I will double check this). You probably need to change your functions to pull this off.
d
Thank you @Kevin Kho I thought this could be the case, could you confirm that there's no way to use
map
from another task and/or suggest an alternative way to do what I need to do? thanks again
k
There is no way. I believe you need to try to frame this in a Task LOOP . Once you get the groups, you want to LOOP over them and execute the tasks per group in order that they need to be.
Otherwise you’d be needing to split the grouping manually and running their tasks separately and specifying the upstream dependency
d
I see, that's basically what I am doing right now, I wanted to map the tasks for each group, as in loop through the groups and then map the tasks so each element of the group gets their tasks to run concurrently. I guess I am doing it the best I can without doing manual dependencies. I'll use a LOOP signal for now and look into building the edges manually if performance becomes an issue. Thank you for your help!
k
Perfect I think you know all the ways to approach this. 👍