https://prefect.io logo
Title
n

Nathan Molby

02/19/2020, 3:40 PM
Hi all, the following code does not work:
from prefect import Flow, task

@task
def createX():
    return []

@task
def alterX(x):
    return x.append(5)

@task
def getZ(x):
    return len(x)

with Flow("Main Flow") as flow:
    x = createX()

    alterX(x)

    z = getZ(x, upstream_tasks=[alterX])

state = flow.run()
It should create x first, and then alter X, and then get Z with the altered x. Instead, it tries to alter x before x has been created. I could add the upstream task x to alterX, but I thought it should do that automatically because it is a data dependency.
a

Alex Goodman

02/19/2020, 3:51 PM
Hey! It seems the problem is that the
alterX
task is being copied (see
flow.visualize()
). Prefect works best when data dependencies are explicitly passed and returned so that Prefect can reason when to pass data from task to task. In your case the reference to
x
is being shared and modified without Prefect being able to see this modification (i.e. the returned data from the task). Here's your code modified to explicitly denote the modification of x:
from prefect import Flow, task


@task
def createX():
    return []


@task
def alterX(x):
    x.append(5)
    return x   # <--- Prefect can now see the modifications to 'x'


@task
def getZ(x):
    return len(x)


with Flow("Main Flow") as flow:
    x = createX()
    y = alterX(x)
    z = getZ(y)

state = flow.run()
Here is the flow visualization of your original code (note the copy of
alterX
)
m

Mark Koob

02/19/2020, 3:52 PM
Dang it you beat me to the punch there Alex. I was busy trying to figure out if x had to be returned in order for Prefect to see the changes or not!
😁 2
n

Nate Atkins

02/19/2020, 3:59 PM
Good little Daily Prefect Coding Problem. We were all on the same track. Thanks for waking my brain up today.
👍 5
n

Nathan Molby

02/19/2020, 4:01 PM
Thanks all, I will implement this fix!