https://prefect.io logo
Title
o

ober

04/18/2023, 7:45 AM
hi everyone ! i tried the below code which run correctly, but got a wrong task relationship in prefect ui, it also got the wrong task_run_id if you check the page "Task inputs" of node_c. Is anyone could help about this ?
def node_a():
    var = f" node_a "
    return var

@task
def start():
    return node_a()

@task
def single():
    return node_a()

@task
def node_b(i):
    var = f"{i} node_b "
    return var

@task
def node_c(i, j):
    var = f"{i}-{j} node_c "
    return var

@flow
def part() :
    x = single()
    a = start()
    b = node_b(a)
    c = node_c.submit(x, b, wait_for=[x, b])
    return 

@flow(name="complex_dag")
def main():
    ret = part()
    print(ret)
1
z

Zanie

04/18/2023, 2:50 PM
Hey when you’re not using futures for upstreams (i.e. the return of submit) we use the return value to infer relationships
Since you return the same string from
single
and
start
and aren’t passing futures around, we’re just doing the best we can
Note also that
wait_for
does nothing in your example since you’re passing data (not futures)
o

ober

04/19/2023, 1:34 AM
Hi @Zanie ! i'm very appreciated for your explatination! after modified the code like below, i got the task relationships i expected, thank you very much !
@flow
def part() :
    x = single.submit()
    a = start()
    b = node_b(a)
    c = node_c(x, b)
    return
🙌 1