Wesley Jin
03/01/2022, 11:45 PMflow.register
vs. on the run_config
object e.g. ECSRun
? If not, is there a preferred place to set it?Anna Geller
03/01/2022, 11:53 PMfrom prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
schedule = Schedule(clocks=[CronClock("0 0 * * *", labels=["some-label"])])
Wesley Jin
03/02/2022, 12:01 AMAnna Geller
03/02/2022, 12:01 AMfrom prefect.run_configs import LocalRun, KubernetesRun, RunConfig
from prefect.storage.github import GitHub
from prefect.client.secrets import Secret
def set_run_config(local: bool = False) -> RunConfig:
if local:
return LocalRun(labels=["dev"])
aws_account_id = Secret("AWS_ACCOUNT_ID").get()
return KubernetesRun(
labels=["prod"],
image=f"{aws_account_id}.<http://dkr.ecr.us-east-1.amazonaws.com/prefect-dbt-k8s-snowflake:latest|dkr.ecr.us-east-1.amazonaws.com/prefect-dbt-k8s-snowflake:latest>",
image_pull_policy="IfNotPresent",
)
def set_storage(flow_name: str) -> GitHub:
return GitHub(
repo="anna-geller/prefect-dbt-k8s-snowflake",
path=f"flows/{flow_name}.py",
access_token_secret="GITHUB_ACCESS_TOKEN",
)
Usage in a Flow:
from prefect import Flow
from flow_utilities.prefect_configs import set_run_config, set_storage
FLOW_NAME = "02_dbt_snowflake"
with Flow(FLOW_NAME,
storage=set_storage(FLOW_NAME),
run_config=set_run_config(),
) as flow:
...