Ievgenii Martynenko
05/04/2022, 12:45 PMAnna Geller
Ievgenii Martynenko
05/04/2022, 1:02 PMAnna Geller
STORAGE = S3(
bucket="prefectdata",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"{FLOW_NAME}.py",
)
#2 package code dependencies into a Docker image and push the image to some registry e.g. ECR
Both steps - pushing flow code to S3 and pushing the image to a registry, can be automated with a CI/CD pipeline
LMK if you need some examplesIevgenii Martynenko
05/04/2022, 1:13 PMAnna Geller
import platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun
import subprocess
PREFECT_PROJECT_NAME = "community"
FLOW_NAME = "s3_docker_run_local_image"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
bucket="prefectdata",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
# this will ensure to upload the Flow script to S3 during registration
local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)
@task(log_stdout=True)
def hello_world(x: str):
print(f"Hello {x} from {FLOW_NAME}!")
print(
f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
)
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
user_input = Parameter("user_input", default="Marvin")
hw = hello_world(user_input)
if __name__ == "__main__":
subprocess.run(
f"prefect register --project {PREFECT_PROJECT_NAME} -p flows/s3_docker_run_local_image.py",
shell=True,
)
Ievgenii Martynenko
05/04/2022, 1:29 PMIevgenii Martynenko
05/04/2022, 1:32 PMIevgenii Martynenko
05/04/2022, 1:48 PMAnna Geller
Anna Geller
Ievgenii Martynenko
05/04/2022, 2:02 PMAnna Geller
flow.register("your_project")
or using CLI;
prefect register --project yourproject -p yourflow.py
if this is confusing to you, check those resources:
⢠https://docs.prefect.io/orchestration/getting-started/registering-and-running-a-flow.html
⢠https://discourse.prefect.io/t/when-do-i-need-to-reregister-my-flow-which-changes-in-flow-metadata-result-in-prefect-bumping-up-the-flow-version/403Ievgenii Martynenko
05/04/2022, 2:07 PMAnna Geller
Anna Geller
Ievgenii Martynenko
05/04/2022, 2:11 PMAnna Geller
Ievgenii Martynenko
05/04/2022, 2:16 PMAnna Geller
Ievgenii Martynenko
05/04/2022, 2:57 PMIevgenii Martynenko
05/05/2022, 7:17 AMMateo Merlo
05/05/2022, 10:21 PMMateo Merlo
05/05/2022, 10:36 PMAnna Geller
prefect register --project xyz -p yourflow.py
and start an agent:
prefect agent docker start --label youragent
then, if your run configuration in yourflow.py is using the same label "youragent", this is how Prefect is able to determine which agent should poll for scheduled flow runs of that flowMateo Merlo
05/07/2022, 3:21 PMAnna Geller
export GOOGLE_APPLICATION_CREDENTIALS=/Users/you/.secrets/gcp.json
Anna Geller
Mateo Merlo
05/07/2022, 5:27 PMFROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
I built the image, pushed to registry in GCP.
In my k8s.cfg I have this line
prefect agent kubernetes start --env GOOGLE_APPLICATION_CREDENTIALS="/opt/prefect/gcs.json"
I checked from the console accessing to the pod and the file is there.
Perhaps I have to upload my credentials in a different way instead of doing it with the docker image.Anna Geller
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
ENV GOOGLE_APPLICATION_CREDENTIALS=/opt/prefect/gcs.json
Mateo Merlo
05/07/2022, 5:49 PMMateo Merlo
05/07/2022, 5:50 PMAnna Geller
Mateo Merlo
05/07/2022, 6:14 PM