Rowen

    Rowen

    1 year ago
    Hi all, I am trying to run a very simple flow. It has succeeded when I run it locally by adding 
    flow.run()
     at the end of my python file. However, when I trigger the flow in the prefect cloud UI, it fails at the 
    extract
     stage, the error being 
    At least one upstream state has an unmappable result
    . Below is the code. I will elaborate more in the threads
    @task
    def transform(x):
        return x + 30
    
    
    
    @task
    def extract():
       return [200, 400, 500]
    
    
    
    with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
        e = extract()
        t = transform.map(e) # fails when i trigger flow in the UI
    Oddly enough, the code above succeeds in my colleague's machine. Also, after various trials and errors, I realised if I modified the 
    extract
     task to return only two items in a list (e.g. 
    [200, 400]
    ), the flow will succeed when triggered in the UI. Modifying my flow to the below will also make the flow succeed:
    with Flow("flow-name", storage=S3(bucket="bucket_name")) as flow:
        t = transform.map([200, 400, 500])
    Is there an explanation for this weird behaviour? (edited)
    If it helps, I have a prefect agent running on ECS
    Zach Angell

    Zach Angell

    1 year ago
    Hi @Rowen ! I'm not sure what would be causing this weird behavior. I'm able to run the flow locally and using Prefect Cloud UI I'll set up a test with S3 storage + ECS agent just to be sure
    What versions of Prefect are you using locally and on your ECS agent?
    Rowen

    Rowen

    1 year ago
    My prefect agent uses
    0.14.14,
    locally it's
    0.14.15
    I am using on macOS Big Sur, if it helps
    also, i ran the code in isolated python environments using
    virtualenv
    and
    poetry
    ... not sure if it affects prefect
    Zach Angell

    Zach Angell

    1 year ago
    Thanks for the additional detail! Do you still see the weird behavior if you upgrade your agent to
    0.14.15
    ?
    Rowen

    Rowen

    1 year ago
    yeap it still happens 😅
    Zach Angell

    Zach Angell

    1 year ago
    😅 Hmmm I'm not able to reproduce with ECS Agent and Prefect
    0.14.15
    I'm registering my flow by running the following script
    from prefect import task, Flow
    from prefect.storage import S3
    
    @task
    def transform(x):
        return x + 30
    @task
    def extract():
       return [200, 400, 500]
    with Flow("flow-name", storage=S3(bucket="zach-testing")) as flow:
        e = extract()
        t = transform.map(e) # fails when i trigger flow in the UI
    
    flow.register('test')
    I'm using conda to manage python environments, I'll look into whether
    virtualenv
    or
    poetry
    could be affecting this but you're right it shouldn't be an issue with Prefect. Is there anything else about your configuration that I could be missing in my setup?
    From your example, it doesn't look applicable, but in case your prod flow is any different this may be relevant https://github.com/PrefectHQ/prefect/issues/3990
    Rowen

    Rowen

    1 year ago
    @Zach Angell which python version are you using for your flow code? thanks for helping out 😅 ...
    Zach Angell

    Zach Angell

    1 year ago
    Thanks for your patience! I'm using Python 3.7 in my testing, happy to test other versions
    Rowen

    Rowen

    1 year ago
    I am using
    Python 3.9.2
    Zach Angell

    Zach Angell

    1 year ago
    That could certainly cause problems. I would recommend downgrading Python versions, Prefect support for Python 3.9 is still experimental.