laura
11/28/2019, 3:37 PMfrom prefect import task, Task, Flow, Parameter
class ParentExecutor(Task):
def run(self, parent_id):
# Do query on a database to return child id's
return ['child1', 'child2']
class ChildExecutor(Task):
def run(self, parent_id):
# Do query on a database to return child id's
return ['grandchild1', 'grandchild2']
class GrandchildExecutor(Task):
def run(self, parent_id):
# grab grandchild from db and do processing with it...
pass
with Flow("Test branching") as test_flow:
parent_id = Parameter('parent_id')
parent_exec = ParentExecutor()
child_ids = parent_exec(parent_id)
child_exec = ChildExecutor()
grandchild_ids = child_exec.map(child_ids)
grandchild_exec = GrandchildExecutor()
grandchild_exec.map(grandchild_ids)
state = test_flow.run(parent_id='parent_id123')
Jeremiah
laura
11/28/2019, 3:52 PMJeremiah
laura
11/28/2019, 4:13 PM