dario
04/25/2021, 2:52 PMgroups = [
[1, 3, 5],
[2, 4, 6],
]
I need to process each ID individually, the problem is that each group of IDs can be parallelized but each group in the list depends on the previous group being fully processed first, like:
for group in groups:
process_id.map(group) # `process_id` has side effects that affect the result of the next group of IDs
is there a way to process the groups in such a way that the each group is parallelized though map
but the groups themselves are processed sequentially?Enda Peng
04/25/2021, 3:45 PMflow = (xxx, tasks=[ group_handler_task(group) for group in groups]
Something like thisEnda Peng
04/25/2021, 3:58 PM@task
def hello_task(x):
logger = prefect.context.get("logger")
sleep(x)
<http://logger.info|logger.info>("Hello world {}".format(x))
with Flow("docker-example 3", executor=executor) as flow:
[hello_task(x) for x in range(20)]
Enda Peng
04/25/2021, 3:59 PMdario
04/25/2021, 4:34 PM@task
def get_id_groups_from_server() -> List:
# this task gets groups of IDs from the server, so prefect can build a hierarchical graph
# the result would be something like:
return [[1, 3, 5], [2, 4, 6]] # group 0 are the children of objects in group 1 and so on
@task
def set_node_size(node):
# the size of all the children need to be computed at this point
if len(node.children) > 0:
node.size = reduce(lambda x, y: x + y.size, node.children, 0)
else:
node.size = 1
with Flow("docker-example 3", executor=executor) as flow:
groups = get_id_groups()
processed_groups = ... # some other tasks that process the IDs into objects
# here's the problem, each entry in `processed_groups` is a list of all the children
# at the next entry of the array, so computing the size of nodes in `processed_groups[1]`
# requires that the nodes in `processed_groups[0]` have their sizes correctly assigned
network = set_node_size.map(processed_groups)
dario
04/25/2021, 4:35 PMEnda Peng
04/25/2021, 6:52 PMedge
. e.g for
for group in groups:
group.set_upstream(another_task)
dario
04/25/2021, 6:59 PMclass DepTask(Task):
def run(self, groups):
r = list()
for group in groups:
t = set_node_size.map(group, flow=flow)
self.set_upstream(t, flow=flow)
r.append(t)
return r;
dario
04/25/2021, 7:00 PMr
doesn't get resolved before the next task and the next task gets a list of tasks as a parameterdario
04/25/2021, 7:02 PMset_node_size
tasksdario
04/25/2021, 7:02 PMKevin Kho
dario
04/26/2021, 1:33 PMmap
from another task and/or suggest an alternative way to do what I need to do? thanks againKevin Kho
Kevin Kho
dario
04/26/2021, 3:12 PMKevin Kho