https://prefect.io logo
Title
z

Zhibin Dai

03/03/2022, 2:31 PM
Hi. I have a flow that copies data from S3 to Snowflake. It works fine locally, but when I run it in ECS using Fargate, I'm getting a strange error.
Failed to load and execute flow run: ImportError("cannot import name 'List' from 'pip' (/usr/local/lib/python3.7/site-packages/pip/__init__.py)")
this is the flow code.
FLOW_NAME = 's3_to_sf_raw'
STORAGE = GitHub(
	repo=REPO,
	path=f"flows/ppi/{FLOW_NAME}.py",
	access_token_secret=PREFECT_GITHUB_ACCESS_TOKEN_NAME,
	ref=GITHUB_BRANCH_NAME
)
RUN_CONFIG = set_run_config(run_config_type=RUN_CONFIG_TYPE)
logger = prefect.context.get('logger')


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
	dataset_name = Parameter(PREFECT_DATASET_PARAM_NAME)
	run_env = Parameter(PREFECT_RUN_ENVIRONMENT_PARAM_NAME, default='dev')
	run_env_upper = "DEV"

	sf_user = PrefectSecret(SNOWFLAKE_PREFECT_USERNAME.format(ENVIRONMENT=run_env_upper))
	sf_pwd = PrefectSecret(SNOWFLAKE_PREFECT_PASSWORD.format(ENVIRONMENT=run_env_upper))
	sf_account = PrefectSecret(SNOWFLAKE_ACCOUNT_ID.format(ENVIRONMENT=run_env_upper))

	snowflake_copy = get_dict_kwargs(
		account=sf_account,
		user=sf_user,
		password=sf_pwd,
		database=SNOWFLAKE_RAW_DATABASE,
		schema=SNOWFLAKE_DEFAULT_SCHEMA,
		role=SNOWFLAKE_LOADER_ROLE,
		warehouse=SNOWFLAKE_LOADER_WAREHOUSE,
		table=dataset_name + "_RAW",
        stage=SNOWFLAKE_PROPERTY_RAW_STAGE + "_" + run_env,
		file_format="jsonf"
	)

	copy_run = run_snowflake_copy(snowflake_copy, dataset_name)
	copy_run_out = print_prefect_output(copy_run)


flow.set_reference_tasks([copy_run])
k

Kevin Kho

03/03/2022, 2:33 PM
You made your own image? Could i see the Dockerfile?
z

Zhibin Dai

03/03/2022, 2:33 PM
very simple
FROM prefecthq/prefect:latest

WORKDIR /app

RUN mkdir .dbt

ADD . .
RUN pip install .
k

Kevin Kho

03/03/2022, 2:36 PM
That is so weird. I guess I would try upgrading pip in the image?
z

Zhibin Dai

03/03/2022, 3:14 PM
Its working now, upgrading did it. Not sure what the issue was
k

Kevin Kho

03/03/2022, 3:32 PM
me too
z

Zhibin Dai

03/03/2022, 3:33 PM
Another related question. When the copy command runs successfully, but no records are loaded (because theres no new data) the flow is successful. Is there a way to configure this so that it returns a failure or another output, so that we can easily see that no records were loaded?
k

Kevin Kho

03/03/2022, 3:34 PM
@task
def something():
    if cond:
        raise FAIL()
z

Zhibin Dai

03/03/2022, 3:34 PM
How would we check the condition in this case?
k

Kevin Kho

03/03/2022, 3:34 PM
where FAIL is the singal
The length of the records object?
z

Zhibin Dai

03/03/2022, 3:36 PM
ok ill try that, thanks