Ryan Kelly
01/21/2021, 4:49 PMZanie
Is this an expected structureYour project setup looks fine! I typically would start an agent with
prefect agent <type> start
rather than from Python but that’s not a big deal.
How are the new jobs supposed to be registered in CI?I would write a python script that imports your flows and calls
flow.register
on each one. Personally, I’ve added a global list to my module and appended each flow to that then I iterate the list and call the register function. Then you just need to call this python script from your CI of choice.
How are dependencies across job files managed?I don’t know what a fact table is 🙂 If the location where the agent is running has your module available on the python path (ie you did
pip install
or edited the PYTHONPATH
env var) then when your flow runs it will be able to import it and be happy. You can also use a Docker agent / storage to ensure your flow’s requirements are packaged alongside it.
Does the agent have to be restarted every time a new flow is registered?no 🙂 the agent queries the server for new flow runs and the server tells the agent where the flow is stored so it can pull it and then run it
Ryan Kelly
01/21/2021, 5:11 PMYour project setup looks fine! I typically would start an agent withHow is it possible to import jobs if the agent is launched via CLI?rather than from Python but that’s not a big deal.prefect agent <type> start
I would write a python script that imports your flows and callsThis increments the version every time right? I was thinking in git pushes to have CI scripts get the files with changes and run them as main to register the flow. Seems similar to what youre saying.on each one. Personally, I’ve added a global list to my module and appended each flow to that then I iterate the list and call the register function. Then you just need to call this python script from your CI of choice.flow.register
I don’t know what a fact table is 🙂 If the location where the agent is running has your module available on the python path (ie you didEasy example is… Dim table is products and Fact table is orders of products by product id. Dim is needed to flesh out info. Im not completely following here but using the above example it sounds like the file for d_products should be imported into f_orders and then f_orders should use StartRunFlow withor edited thepip install
env var) then when your flow runs it will be able to import it and be happyPYTHONPATH
wait
set to True
and set_upstream tied to d_products?Zanie
BK Lau
01/21/2021, 5:46 PMZanie
Darshan
01/21/2021, 6:29 PMZanie
Darshan
01/21/2021, 6:41 PMZanie
from typing import List
from prefect.storage.docker import Docker
from prefect_flows import PROJECT_PATH, PROJECT_NAME, config
def ProjectDockerStorage(project_extras: List[str] = None, **kwargs) -> Docker:
"""
A thin wrapper around `prefect.storage.Docker` with
- Installation of this project and any given extras
- Handling for CI docker client/server setups
Cannot be a class because then it is not a known serializable storage type so this
is just an instance factory for Docker storage
"""
# CircleCI -------------------------------------------------------------------------
# Configure docker storage building for CI which tends to have a different base
# url and require a TLS config. This may be fixed in Prefect upstream and could then
# be removed.
import docker
default_client = docker.from_env()
# This will break local runs which makes me think this *may* be CircleCI specific
if default_client.api.base_url != "<http+docker://localhost>":
kwargs.setdefault("base_url", default_client.api.base_url)
kwargs.setdefault("tls_config", docker.TLSConfig(default_client.api.cert))
# Project installation -------------------------------------------------------------
# Copy this project into the docker image
kwargs.setdefault("files", {})
kwargs["files"][str(PROJECT_PATH)] = PROJECT_NAME
# Install the project so it's on the Python path
extras = ""
if project_extras:
extras = f"[{','.join(project_extras)}]"
kwargs.setdefault("extra_dockerfile_commands", [])
kwargs["extra_dockerfile_commands"].append(
f"RUN pip install -e {PROJECT_NAME}{extras}"
)
kwargs.setdefault("registry_url", config.docker.registry_url)
return Docker(**kwargs)
Darshan
01/21/2021, 6:44 PM