https://prefect.io logo
r

Rio McMahon

02/17/2022, 8:54 PM
Hello - my flows on prefect cloud keep failing. The state message for the failed run is
State Message: {'_schema': 'Invalid data type: None'}
but I am unclear on what that means. Could you clarify what this might indicate or ways to get more informative error messages? Thanks.
a

Anna Geller

02/17/2022, 9:34 PM
A couple of question that may help us find the issue: 1. What storage do you use? Can you share your flow, or at least the storage and run config? 2. Usually such issues occur during serialization. It might be either your flow or flow results that cannot be serialized. If the latter, you could add
checkpoint=False
to your task decorator 3. Do you use any custom modules which are not installed within your execution environment? 4. How did you register this flow?
r

Rio McMahon

02/17/2022, 9:43 PM
Hi Anna thanks for the quick response - 1. I am attempting to use Gitlab. The flow looks like
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun
from prefect.client import Secret

from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg

# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running script...")
    sf_username = Secret('snowflake_credential_username').get()
    sf_password = Secret('snowflake_credential_password').get()
    run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)

# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder",
        storage=GitLab(
            repo="repo/repo_name",
            path="flow.py",
            access_token_secret="secret_name",
            ),
        run_config=DockerRun(
            image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
            )
         ) as flow:
    run_script()

# Register the flow under the "tutorial" project
flow.register(project_name="Testing")
2. I added that per your recommendation but am still getting a run failure. 3. I have an external script that I load via
from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
. I build a docker container called
seasonality_index_builder
and store in AWS ECR which I am trying to use as the environment. 4. I register the flow locally using
python flow.py
(the name of this file is
flow.py
)
a

Anna Geller

02/17/2022, 9:56 PM
Thanks for providing more info. Three things may help to debug this: 1) Are you sure you committed your code to the respective Gitlab repo before running the flow? Often someone may forget that and this way your flow tries to retrieve flow from storage even though a different flow version got used at registration. So it would be good to cross check the registered flow version matches with your flow in Gitlab 2) Can you try running the same flow (including all your logic here and the same Flow structure) but with defaults for storage and run config, i.e. using local storage and local agent?
Copy code
import prefect
from prefect import task, Flow
from prefect.client import Secret

from src.seasonality_index_builder_dynamic_agg import (
    run_seasonality_index_builder_dynamic_agg,
)

# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Running script...")
    sf_username = Secret("snowflake_credential_username").get()
    sf_password = Secret("snowflake_credential_password").get()
    run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)


# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder") as flow:
    run_script()
Then, you can register this flow using the CLI:
Copy code
prefect register --project Testing -p flow.py
3) Can you try testing your DockerRun and Gitlab storage configuration using a simple hello-world flow? This way you can check whether your storage and run config works or whether it's an issue in your flow:
Copy code
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun

@task(log_stdout=True)
def hello_world():
    print("hello world")


with Flow("hello", storage=GitLab(
            repo="repo/repo_name",
            path="flow.py",
            access_token_secret="secret_name",
            ),
            run_config=DockerRun(
            image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
            )
) as flow:
    hw = hello_world()
r

Rio McMahon

02/17/2022, 10:45 PM
1. I verified that my remote repo matched the state of the repo when I registered the agent. 2. I am able to run the flow locally using a local agent. 3. I tried to run the simple agent using this code:
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun

@task(log_stdout=True)
def say_hello():
    print("hello world")

with Flow("hello-gitlab-flow",
        storage=GitLab(
            repo="rio.mcmahon/prefect_test",
            path="hello_cloud_flow.py",
            access_token_secret="[secret_name]"  # this is generated within gitlab and stored in prefect cloud
            ),
        run_config=DockerRun(
            image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil>    der:latest',
            )
        ) as flow:
    hw = say_hello()
And the run failed with
State Message: {'_schema': 'Invalid data type: None'}
. I then tried to run it 1) just GitLab (no
run_config
) and then 2) using local storage (no
storage
or
run_config
options set). The second run looks like:
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun

@task(log_stdout=True)
def say_hello():
    print("hello world")

with Flow("hello-gitlab-flow",
#        storage=GitLab(
#            repo="rio.mcmahon/prefect_test",
#            path="hello_cloud_flow.py",
#            access_token_secret="[secret_name]"  # this is generated within gitlab and stored in prefect cloud
#            ),
#        run_config=DockerRun(
#            image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil>    der:latest',
#            )
        ) as flow:
    hw = say_hello()
And both of these registered flows fail with the same
State Message: {'_schema': 'Invalid data type: None'}
error. The agent that is picking up these flows is a FargateAgent - could that have something to do with it?
a

Anna Geller

02/17/2022, 10:54 PM
Yes! Thanks for sharing this important detail 😄 FargateAgent is deprecated in favor of ECSAgent. Also, if you are using Fargate, you should use ECSRun run config rather than DockerRun. If you need more tutorials or examples for Fargate, check out this blog and here are some examples with ECSRun and various storage mechanisms. I think especially those code examples may be helpful, so feel free to ignore the blog 🙂
r

Rio McMahon

02/17/2022, 11:06 PM
Sorry for the novice mistake - still figuring out prefect. If I try and run this flow:
Copy code
import prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import ECSRun

@task(log_stdout=True)
def say_hello():
    print("hello world")

with Flow("hello-gitlab-flow",
        run_config=ECSRun() as flow:
    hw = say_hello()
I am still getting the same
State Message: {'_schema': 'Invalid data type: None'}
error. Do you have any ideas on what might cause this or how to debug it?
a

Anna Geller

02/17/2022, 11:49 PM
Yes, in that case this might be just some old Prefect version. Can you try to upgrade to latest?
r

Rio McMahon

02/18/2022, 12:23 AM
I just looked and the existing agent is version 13.9 and I was using the current prefect version. I'm getting more sane log messages now. Thanks for all the help - sorry for all the red herrings when it is just a 2 year old agent
👍 2
5 Views