laura
11/28/2019, 3:37 PMfrom prefect import task, Task, Flow, Parameter
class ParentExecutor(Task):
    def run(self, parent_id):
        # Do query on a database to return child id's
        return ['child1', 'child2']
class ChildExecutor(Task):
    def run(self, parent_id):
        # Do query on a database to return child id's
        return ['grandchild1', 'grandchild2']
class GrandchildExecutor(Task):
    def run(self, parent_id):
        # grab grandchild from db and do processing with it...
        pass
with Flow("Test branching") as test_flow:
    parent_id = Parameter('parent_id')
    parent_exec = ParentExecutor()
    child_ids = parent_exec(parent_id)
    child_exec = ChildExecutor()
    grandchild_ids = child_exec.map(child_ids)
    grandchild_exec = GrandchildExecutor()
    grandchild_exec.map(grandchild_ids)
state = test_flow.run(parent_id='parent_id123')Jeremiah
Jeremiah
laura
11/28/2019, 3:52 PMJeremiah
Jeremiah
laura
11/28/2019, 4:13 PM