David Ojeda
05/06/2020, 9:17 AMValueError: A task with the slug "limit" already exists in this flow.
I could come up with a plumbing hack like this:
flow = Flow(name)
local_flow = build_local_flow() # a function that returns a flow
quetzal_flow = build_quetzal_flow() # idem
# plumbing: both local and quetzal flows have a limit parameter
limit_parameter = local_flow.get_tasks(name='limit')[0]
other_parameter = quetzal_flow.get_tasks(name='limit')[0]
for edge in quetzal_flow.edges:
if edge.upstream_task == other_parameter:
edge.upstream_task = limit_parameter
quetzal_flow.tasks = set(t for t in quetzal_flow.tasks if t != other_parameter)
flow.update(local_flow)
flow.update(quetzal_flow)
...
which works, but it seems very hackish and far from elegant.
Is there a cleaner alternative to this? (other than renaming the parameter, of course)Avi A
05/06/2020, 9:38 AMDavid Ojeda
05/06/2020, 9:52 AMimport prefect
@prefect.task
def generate1(limit):
return list(range(10))[:limit]
@prefect.task
def generate2(limit):
return list(range(100, 1000))[:limit]
@prefect.task
def echo(x):
print('Got', x)
with prefect.Flow('flow1') as flow1:
limit1 = prefect.Parameter('limit')
data1 = generate1(limit=limit1)
echo.map(x=data1)
with prefect.Flow('flow2') as flow2:
limit2 = prefect.Parameter('limit')
data2 = generate2(limit=limit2)
echo.map(x=data2)
flow3 = prefect.Flow('flow3')
flow3.update(flow1)
flow3.update(flow2)
Jeremiah
05/06/2020, 1:13 PMDavid Ojeda
05/06/2020, 1:33 PMJeremiah
05/06/2020, 1:42 PMDavid Ojeda
05/06/2020, 1:53 PM