Zhibin Dai
03/03/2022, 2:31 PMFailed to load and execute flow run: ImportError("cannot import name 'List' from 'pip' (/usr/local/lib/python3.7/site-packages/pip/__init__.py)")
FLOW_NAME = 's3_to_sf_raw'
STORAGE = GitHub(
repo=REPO,
path=f"flows/ppi/{FLOW_NAME}.py",
access_token_secret=PREFECT_GITHUB_ACCESS_TOKEN_NAME,
ref=GITHUB_BRANCH_NAME
)
RUN_CONFIG = set_run_config(run_config_type=RUN_CONFIG_TYPE)
logger = prefect.context.get('logger')
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
dataset_name = Parameter(PREFECT_DATASET_PARAM_NAME)
run_env = Parameter(PREFECT_RUN_ENVIRONMENT_PARAM_NAME, default='dev')
run_env_upper = "DEV"
sf_user = PrefectSecret(SNOWFLAKE_PREFECT_USERNAME.format(ENVIRONMENT=run_env_upper))
sf_pwd = PrefectSecret(SNOWFLAKE_PREFECT_PASSWORD.format(ENVIRONMENT=run_env_upper))
sf_account = PrefectSecret(SNOWFLAKE_ACCOUNT_ID.format(ENVIRONMENT=run_env_upper))
snowflake_copy = get_dict_kwargs(
account=sf_account,
user=sf_user,
password=sf_pwd,
database=SNOWFLAKE_RAW_DATABASE,
schema=SNOWFLAKE_DEFAULT_SCHEMA,
role=SNOWFLAKE_LOADER_ROLE,
warehouse=SNOWFLAKE_LOADER_WAREHOUSE,
table=dataset_name + "_RAW",
stage=SNOWFLAKE_PROPERTY_RAW_STAGE + "_" + run_env,
file_format="jsonf"
)
copy_run = run_snowflake_copy(snowflake_copy, dataset_name)
copy_run_out = print_prefect_output(copy_run)
flow.set_reference_tasks([copy_run])
Kevin Kho
Zhibin Dai
03/03/2022, 2:33 PMFROM prefecthq/prefect:latest
WORKDIR /app
RUN mkdir .dbt
ADD . .
RUN pip install .
Kevin Kho
Zhibin Dai
03/03/2022, 3:14 PMKevin Kho
Zhibin Dai
03/03/2022, 3:33 PMKevin Kho
@task
def something():
if cond:
raise FAIL()
Zhibin Dai
03/03/2022, 3:34 PMKevin Kho
Zhibin Dai
03/03/2022, 3:36 PM