hi everyone !
i tried the below code which run correctly, but got a wrong task relationship in prefect ui, it also got the wrong task_run_id if you check the page "Task inputs" of node_c. Is anyone could help about this ?
Copy code
def node_a():
var = f" node_a "
return var
@task
def start():
return node_a()
@task
def single():
return node_a()
@task
def node_b(i):
var = f"{i} node_b "
return var
@task
def node_c(i, j):
var = f"{i}-{j} node_c "
return var
@flow
def part() :
x = single()
a = start()
b = node_b(a)
c = node_c.submit(x, b, wait_for=[x, b])
return
@flow(name="complex_dag")
def main():
ret = part()
print(ret)
✅ 1
z
Zanie
04/18/2023, 2:50 PM
Hey when you’re not using futures for upstreams (i.e. the return of submit) we use the return value to infer relationships
Zanie
04/18/2023, 2:50 PM
Since you return the same string from
single
and
start
and aren’t passing futures around, we’re just doing the best we can
Zanie
04/18/2023, 2:51 PM
Note also that
wait_for
does nothing in your example since you’re passing data (not futures)
o
ober
04/19/2023, 1:34 AM
Hi @Zanie !
i'm very appreciated for your explatination! after modified the code like below, i got the task relationships i expected, thank you very much !
Copy code
@flow
def part() :
x = single.submit()
a = start()
b = node_b(a)
c = node_c(x, b)
return
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.