Rio McMahon
02/17/2022, 8:54 PMState Message: {'_schema': 'Invalid data type: None'}
but I am unclear on what that means. Could you clarify what this might indicate or ways to get more informative error messages? Thanks.Anna Geller
checkpoint=False
to your task decorator
3. Do you use any custom modules which are not installed within your execution environment?
4. How did you register this flow?Rio McMahon
02/17/2022, 9:43 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun
from prefect.client import Secret
from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running script...")
sf_username = Secret('snowflake_credential_username').get()
sf_password = Secret('snowflake_credential_password').get()
run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)
# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder",
storage=GitLab(
repo="repo/repo_name",
path="flow.py",
access_token_secret="secret_name",
),
run_config=DockerRun(
image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
)
) as flow:
run_script()
# Register the flow under the "tutorial" project
flow.register(project_name="Testing")
2. I added that per your recommendation but am still getting a run failure.
3. I have an external script that I load via from src.seasonality_index_builder_dynamic_agg import run_seasonality_index_builder_dynamic_agg
. I build a docker container called seasonality_index_builder
and store in AWS ECR which I am trying to use as the environment.
4. I register the flow locally using python flow.py
(the name of this file is flow.py
)Anna Geller
import prefect
from prefect import task, Flow
from prefect.client import Secret
from src.seasonality_index_builder_dynamic_agg import (
run_seasonality_index_builder_dynamic_agg,
)
# define a wrapper task to expose logging
@task(log_stdout=True, checkpoint=False)
def run_script():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Running script...")
sf_username = Secret("snowflake_credential_username").get()
sf_password = Secret("snowflake_credential_password").get()
run_seasonality_index_builder_dynamic_agg(sf_username, sf_password)
# instantiate the flow - we store the flow definition in gitlab
with Flow("seasonality_index_builder") as flow:
run_script()
Then, you can register this flow using the CLI:
prefect register --project Testing -p flow.py
3) Can you try testing your DockerRun and Gitlab storage configuration using a simple hello-world flow? This way you can check whether your storage and run config works or whether it's an issue in your flow:
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("hello", storage=GitLab(
repo="repo/repo_name",
path="flow.py",
access_token_secret="secret_name",
),
run_config=DockerRun(
image='AWS_ACCOUNT#.<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_builder:latest>',
)
) as flow:
hw = hello_world()
Rio McMahon
02/17/2022, 10:45 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun
@task(log_stdout=True)
def say_hello():
print("hello world")
with Flow("hello-gitlab-flow",
storage=GitLab(
repo="rio.mcmahon/prefect_test",
path="hello_cloud_flow.py",
access_token_secret="[secret_name]" # this is generated within gitlab and stored in prefect cloud
),
run_config=DockerRun(
image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil> der:latest',
)
) as flow:
hw = say_hello()
And the run failed with State Message: {'_schema': 'Invalid data type: None'}
.
I then tried to run it 1) just GitLab (no run_config
) and then 2) using local storage (no storage
or run_config
options set). The second run looks like:
import prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import DockerRun
@task(log_stdout=True)
def say_hello():
print("hello world")
with Flow("hello-gitlab-flow",
# storage=GitLab(
# repo="rio.mcmahon/prefect_test",
# path="hello_cloud_flow.py",
# access_token_secret="[secret_name]" # this is generated within gitlab and stored in prefect cloud
# ),
# run_config=DockerRun(
# image='[account_number].<http://dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil|dkr.ecr.us-east-2.amazonaws.com/seasonality_index_buil> der:latest',
# )
) as flow:
hw = say_hello()
And both of these registered flows fail with the same State Message: {'_schema': 'Invalid data type: None'}
error.
The agent that is picking up these flows is a FargateAgent - could that have something to do with it?Anna Geller
Rio McMahon
02/17/2022, 11:06 PMimport prefect
from prefect import task, Flow
from prefect.storage import GitLab
from prefect.run_configs import ECSRun
@task(log_stdout=True)
def say_hello():
print("hello world")
with Flow("hello-gitlab-flow",
run_config=ECSRun() as flow:
hw = say_hello()
I am still getting the same State Message: {'_schema': 'Invalid data type: None'}
error. Do you have any ideas on what might cause this or how to debug it?Anna Geller
Rio McMahon
02/18/2022, 12:23 AM