Charles Liu
03/16/2021, 8:28 PMTraceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 245, in run
parameters=parameters,
File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/flow_runner.py", line 402, in initialize_run
raise KeyError(msg) from exc
KeyError: 'Task slug CRED_secrets-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.'
Zanie
-1
. Can you try to register your flow in a __main__
check block like:
if __name__ == "__main__":
flow.register(...)
Zanie
Charles Liu
03/16/2021, 8:47 PMfrom prefect.run_configs import KubernetesRun
from prefect.storage import CodeCommit, S3
from prefect import Flow, task
from custom_package import task_runner as custom_package_task_runner
import logging
import sys
from prefect.tasks.secrets import PrefectSecret
@task(log_stdout=True)
def task_extract(api_creds):
my_runner = custom_package_task_runner.custom_packageRunner()
config = {
#config stuff here
}
df_e = my_runner.custom_package_extract(config=config)
print(df_e)
return df_e
@task(log_stdout=True)
def task_transform(df_e):
my_runner = custom_package_task_runner.custom_packageRunner()
transform_config = {
#config stuff here
}
df_t = my_runner.custom_package_transform(df_e, transform_config)
print(df_t)
return df_t
@task(log_stdout=True)
def task_stage(df_t):
my_runner = custom_package_task_runner.custom_packageRunner()
stage_config = {
#config stuff here
}
s3_path = my_runner.custom_package_stage(df_t, stage_config)
print(s3_path)
return s3_path
@task(log_stdout=True)
def task_load(s3_path):
my_runner = custom_package_task_runner.custom_packageRunner()
load_config = {
#config stuff here
file_steps = my_runner.custom_package_load(load_config)
print(file_steps)
return file_steps
STORAGE = CodeCommit(repo="custom_package_prefect",
path="/internal_package/EKS_internal_package.py",
commit='master',
secrets=["AWS_CREDENTIALS"])
RUN_CONFIG = KubernetesRun(image="IMAGE_URL",
image_pull_secrets=["AWS_CREDENTIALS"])
with Flow("EKS-codecommit", storage=STORAGE, run_config=RUN_CONFIG) as flow:
client_secret = PrefectSecret("internal_package_secrets")
first_step = task_extract(api_creds=client_secret)
second_step = task_transform(first_step)
third_step = task_stage(second_step)
fourth_step = task_load(third_step)
if __name__ == "__main__":
flow.register(project_name="EKS-codecommit-test")
Charles Liu
03/16/2021, 8:51 PMCharles Liu
03/16/2021, 8:52 PMZanie
Zanie
Charles Liu
03/16/2021, 8:53 PMZanie
CodeCommit
/ repo?Zanie
Charles Liu
03/16/2021, 8:55 PMZanie
Charles Liu
03/16/2021, 8:56 PMCharles Liu
03/16/2021, 8:57 PMCharles Liu
03/16/2021, 9:01 PMZanie