Rowen
04/13/2021, 12:54 PMflow.run() at the end of my python file. However, when I trigger the flow in the prefect cloud UI, it fails at the extract stage, the error being At least one upstream state has an unmappable result. Below is the code. I will elaborate more in the threads
@task
def transform(x):
return x + 30
@task
def extract():
return [200, 400, 500]
with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
e = extract()
t = transform.map(e) # fails when i trigger flow in the UIRowen
04/13/2021, 12:55 PMextract task to return only two items in a list (e.g. [200, 400]), the flow will succeed when triggered in the UI. Modifying my flow to the below will also make the flow succeed:
with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
t = transform.map([200, 400, 500])
Is there an explanation for this weird behaviour? (edited)Rowen
04/13/2021, 12:58 PMZach Angell
Zach Angell
Rowen
04/14/2021, 12:55 AM0.14.14, locally it's 0.14.15Rowen
04/14/2021, 12:55 AMRowen
04/14/2021, 1:46 AMvirtualenv and poetry ... not sure if it affects prefectZach Angell
0.14.15 ?Rowen
04/15/2021, 12:06 AMZach Angell
0.14.15
I'm registering my flow by running the following script
from prefect import task, Flow
from prefect.storage import S3
@task
def transform(x):
return x + 30
@task
def extract():
return [200, 400, 500]
with Flow("flow-name", storage=S3(bucket="zach-testing")) as flow:
e = extract()
t = transform.map(e) # fails when i trigger flow in the UI
flow.register('test')Zach Angell
virtualenv or poetry could be affecting this but you're right it shouldn't be an issue with Prefect.
Is there anything else about your configuration that I could be missing in my setup?Zach Angell
Rowen
04/16/2021, 1:03 AMZach Angell
Rowen
04/16/2021, 1:11 AMPython 3.9.2Zach Angell