https://prefect.io logo
Title
l

laura

11/28/2019, 3:37 PM
Hi, I would really appreciate some help with map... I need a branching workflow so that a parent -> many children tasks -> each child has many grandchildren. The following code does not work how I want it to, as the grandchild executor is only called 2 times (as there are 2 children) and a list of grandchild_id's is given to the GrandchildExecutor rather than a single id. I would like the GrandchildExecutor to be called 4 times, which means the mapping has to be done differently. Any ideas????
from prefect import task, Task, Flow, Parameter

class ParentExecutor(Task):
    def run(self, parent_id):
        # Do query on a database to return child id's
        return ['child1', 'child2']

class ChildExecutor(Task):
    def run(self, parent_id):
        # Do query on a database to return child id's
        return ['grandchild1', 'grandchild2']

class GrandchildExecutor(Task):
    def run(self, parent_id):
        # grab grandchild from db and do processing with it...
        pass

with Flow("Test branching") as test_flow:
    parent_id = Parameter('parent_id')
    parent_exec = ParentExecutor()
    child_ids = parent_exec(parent_id)
    child_exec = ChildExecutor()
    grandchild_ids = child_exec.map(child_ids)
    grandchild_exec = GrandchildExecutor()
    grandchild_exec.map(grandchild_ids)

state = test_flow.run(parent_id='parent_id123')
j

Jeremiah

11/28/2019, 3:42 PM
Hi Laura, in this case the result of child_exec is a list of lists, because each child is producing a list of child ids. Therefore, the grandchild executor is mapping over each element of the “outer” list and receiving a list of ids. We would recommend introducing a “flatten” task that reduces the list of lists that comes from the children into a single list of ids - in your example it would take a list of two lists and output a list of 4 ids. Then the grandchild executor can map over that flattened list in the way you want.
This “nested map” comes up occasionally and we’re trying to think of a clean way to generalize it as a first class API
l

laura

11/28/2019, 3:52 PM
Hi Jeremiah, thanks for the super fast reply. I was thinking about this, maybe it doesn't suit my case exactly anyway as no grandchild task can start before all the child tasks have finished, when actually there is no requirement for dependence there. Thanks for the reply, I will try the flatten idea for now 🙂
j

Jeremiah

11/28/2019, 3:59 PM
Yup we call that “depth-first-execution” for mapping and supporting it is a major priority. We wrote a story with the Dask team explaining the technical limitation that prevented us from supporting it in the past, and were working on an architecture that will allow it.
l

laura

11/28/2019, 4:13 PM
Good to know, thanks for your help!