Nathan Molby
02/19/2020, 3:40 PMfrom prefect import Flow, task
@task
def createX():
return []
@task
def alterX(x):
return x.append(5)
@task
def getZ(x):
return len(x)
with Flow("Main Flow") as flow:
x = createX()
alterX(x)
z = getZ(x, upstream_tasks=[alterX])
state = flow.run()
It should create x first, and then alter X, and then get Z with the altered x. Instead, it tries to alter x before x has been created. I could add the upstream task x to alterX, but I thought it should do that automatically because it is a data dependency.Alex Goodman
02/19/2020, 3:51 PMalterX
task is being copied (see flow.visualize()
). Prefect works best when data dependencies are explicitly passed and returned so that Prefect can reason when to pass data from task to task. In your case the reference to x
is being shared and modified without Prefect being able to see this modification (i.e. the returned data from the task).
Here's your code modified to explicitly denote the modification of x:
from prefect import Flow, task
@task
def createX():
return []
@task
def alterX(x):
x.append(5)
return x # <--- Prefect can now see the modifications to 'x'
@task
def getZ(x):
return len(x)
with Flow("Main Flow") as flow:
x = createX()
y = alterX(x)
z = getZ(y)
state = flow.run()
alterX
)Mark Koob
02/19/2020, 3:52 PMNate Atkins
02/19/2020, 3:59 PMNathan Molby
02/19/2020, 4:01 PM