Thread
#prefect-community
    Rio McMahon

    Rio McMahon

    7 months ago
    Hello - my flows on prefect cloud keep failing. The state message for the failed run is
    State Message: {'_schema': 'Invalid data type: None'}
    but I am unclear on what that means. Could you clarify what this might indicate or ways to get more informative error messages? Thanks.
    Anna Geller

    Anna Geller

    7 months ago
    A couple of question that may help us find the issue:1. What storage do you use? Can you share your flow, or at least the storage and run config? 2. Usually such issues occur during serialization. It might be either your flow or flow results that cannot be serialized. If the latter, you could add
    checkpoint=False
    to your task decorator 3. Do you use any custom modules which are not installed within your execution environment? 4. How did you register this flow?
    Rio McMahon

    Rio McMahon

    7 months ago
    Hi Anna thanks for the quick response -1. I am attempting to use Gitlab. The flow looks like
    import prefect
    from prefect import task, Flow
    from prefect.storage import GitLab
    from prefect.run_configs import DockerRun
    from prefect.client import Secret
    
    from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
    
    # define a wrapper task to expose logging
    @task(log_stdout=True, checkpoint=False)
    def run_script():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Running script...")
        sf_username = Secret('snowflake_credential_username').get()
        sf_password = Secret('snowflake_credential_password').get()
        run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)
    
    # instantiate the flow - we store the flow definition in gitlab
    with Flow("seasonality_index_builder",
            storage=GitLab(
                repo="repo/repo_name",
                path="flow.py",
                access_token_secret="secret_name",
                ),
            run_config=DockerRun(
                image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
                )
             ) as flow:
        run_script()
    
    # Register the flow under the "tutorial" project
    flow.register(project_name="Testing")
    2. I added that per your recommendation but am still getting a run failure. 3. I have an external script that I load via
    from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
    . I build a docker container called
    seasonality_index_builder
    and store in AWS ECR which I am trying to use as the environment. 4. I register the flow locally using
    python flow.py
    (the name of this file is
    flow.py
    )
    Anna Geller

    Anna Geller

    7 months ago
    Thanks for providing more info. Three things may help to debug this:1) Are you sure you committed your code to the respective Gitlab repo before running the flow? Often someone may forget that and this way your flow tries to retrieve flow from storage even though a different flow version got used at registration. So it would be good to cross check the registered flow version matches with your flow in Gitlab 2) Can you try running the same flow (including all your logic here and the same Flow structure) but with defaults for storage and run config, i.e. using local storage and local agent?
    import prefect
    from prefect import task, Flow
    from prefect.client import Secret
    
    from src.seasonality_index_builder_dynamic_agg import (
        run_seasonality_index_builder_dynamic_agg,
    )
    
    # define a wrapper task to expose logging
    @task(log_stdout=True, checkpoint=False)
    def run_script():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Running script...")
        sf_username = Secret("snowflake_credential_username").get()
        sf_password = Secret("snowflake_credential_password").get()
        run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)
    
    
    # instantiate the flow - we store the flow definition in gitlab
    with Flow("seasonality_index_builder") as flow:
        run_script()
    Then, you can register this flow using the CLI:
    prefect register --project Testing -p flow.py
    3) Can you try testing your DockerRun and Gitlab storage configuration using a simple hello-world flow? This way you can check whether your storage and run config works or whether it's an issue in your flow:
    from prefect import task, Flow
    from prefect.storage import GitLab
    from prefect.run_configs import DockerRun
    
    @task(log_stdout=True)
    def hello_world():
        print("hello world")
    
    
    with Flow("hello", storage=GitLab(
                repo="repo/repo_name",
                path="flow.py",
                access_token_secret="secret_name",
                ),
                run_config=DockerRun(
                image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
                )
    ) as flow:
        hw = hello_world()
    Rio McMahon

    Rio McMahon

    7 months ago
    1. I verified that my remote repo matched the state of the repo when I registered the agent. 2. I am able to run the flow locally using a local agent. 3. I tried to run the simple agent using this code:
    import prefect
    from prefect import task, Flow
    from prefect.storage import GitLab
    from prefect.run_configs import DockerRun
    
    @task(log_stdout=True)
    def say_hello():
        print("hello world")
    
    with Flow("hello-gitlab-flow",
            storage=GitLab(
                repo="rio.mcmahon/prefect_test",
                path="hello_cloud_flow.py",
                access_token_secret="[secret_name]"  # this is generated within gitlab and stored in prefect cloud
                ),
            run_config=DockerRun(
                image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil>    der:latest',
                )
            ) as flow:
        hw = say_hello()
    And the run failed with
    State Message: {'_schema': 'Invalid data type: None'}
    . I then tried to run it 1) just GitLab (no
    run_config
    ) and then 2) using local storage (no
    storage
    or
    run_config
    options set). The second run looks like:
    import prefect
    from prefect import task, Flow
    from prefect.storage import GitLab
    from prefect.run_configs import DockerRun
    
    @task(log_stdout=True)
    def say_hello():
        print("hello world")
    
    with Flow("hello-gitlab-flow",
    #        storage=GitLab(
    #            repo="rio.mcmahon/prefect_test",
    #            path="hello_cloud_flow.py",
    #            access_token_secret="[secret_name]"  # this is generated within gitlab and stored in prefect cloud
    #            ),
    #        run_config=DockerRun(
    #            image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil>    der:latest',
    #            )
            ) as flow:
        hw = say_hello()
    And both of these registered flows fail with the same
    State Message: {'_schema': 'Invalid data type: None'}
    error. The agent that is picking up these flows is a FargateAgent - could that have something to do with it?
    Anna Geller

    Anna Geller

    7 months ago
    Yes! Thanks for sharing this important detail 😄 FargateAgent is deprecated in favor of ECSAgent. Also, if you are using Fargate, you should use ECSRun run config rather than DockerRun. If you need more tutorials or examples for Fargate, check out this blog and here are some examples with ECSRun and various storage mechanisms. I think especially those code examples may be helpful, so feel free to ignore the blog 🙂
    Rio McMahon

    Rio McMahon

    7 months ago
    Sorry for the novice mistake - still figuring out prefect. If I try and run this flow:
    import prefect
    from prefect import task, Flow
    from prefect.storage import GitLab
    from prefect.run_configs import ECSRun
    
    @task(log_stdout=True)
    def say_hello():
        print("hello world")
    
    with Flow("hello-gitlab-flow",
            run_config=ECSRun() as flow:
        hw = say_hello()
    I am still getting the same
    State Message: {'_schema': 'Invalid data type: None'}
    error. Do you have any ideas on what might cause this or how to debug it?
    Anna Geller

    Anna Geller

    7 months ago
    Yes, in that case this might be just some old Prefect version. Can you try to upgrade to latest?
    Rio McMahon

    Rio McMahon

    7 months ago
    I just looked and the existing agent is version 13.9 and I was using the current prefect version. I'm getting more sane log messages now. Thanks for all the help - sorry for all the red herrings when it is just a 2 year old agent