Slackbot
07/26/2022, 3:15 PMNate
07/26/2022, 3:16 PMSebastián Montoya Tapia
07/26/2022, 3:19 PMfrom datetime import datetime
from prefect import Flow
from prefect.tasks.secrets import PrefectSecret
from tasks.extract import function_1, function_2
from tasks.load import function_1, function_2
from prefect.storage import S3
from prefect.run_configs import ECSRun
import os
ENV_1 = os.environ.get('ENV_1')
ENV_2 = os.environ.get('ENV_2')
AWS_IAM_ID = os.environ.get('AWS_IAM_ID')
FLOW_NAME = "flow-name"
IMAGE_NAME= "flow-name-image"
STORAGE = S3(
bucket="fantasy-bucket",
key=f"{IMAGE_NAME}/flow.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"flow/src/flow.py",
)
RUN_CONFIG = ECSRun(
labels=["super-label"],
task_role_arn=f"arn:aws:iam::{AWS_IAM_ID}:role/fantasyTaskRole",
execution_role_arn=f"arn:aws:iam::{AWS_IAM_ID}:role/fantasyExecutionRole",
run_task_kwargs=dict(cluster="superCluster", launchType="FARGATE",),
env={
"PREFECT__CONTEXT__SECRETS__ENV_1": ENV_1,
"PREFECT__CONTEXT__SECRETS__ENV_2": ENV_2
},
image=f"{AWS_IAM_ID}.<http://dkr.ecr.any-az.amazonaws.com/{IMAGE_NAME}:latest|dkr.ecr.any-az.amazonaws.com/{IMAGE_NAME}:latest>"
)
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
# Lots of tasks in here
super_taks = super_task_function()
# Register the flow
flow.register("ProjectName")
Nate
07/26/2022, 3:29 PMSebastián Montoya Tapia
07/26/2022, 3:30 PMFROM prefecthq/prefect:1.2.4-python3.8
RUN apt-get update && apt-get upgrade -y
# package for psycopg2
RUN apt-get install libpq-dev -y
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY src/ /opt/prefect/flow/src/
COPY requirements.txt .
COPY setup.py .
RUN pip install .
Nate
07/26/2022, 5:34 PMSebastián Montoya Tapia
07/26/2022, 5:37 PM