Hi all, we’ve done a POC of Prefect and love what ...
# prefect-community
r
Hi all, we’ve done a POC of Prefect and love what the tool has to offer. Been able to get flows running and tied to Prefect Cloud but am struggling to understand scaling and deployment workflow from the documentation and articles online. Would love to write one up and post it on Medium once figured out! My questions are: • Is this structure an expected structure? • How are the new jobs supposed to be registered through CI/CD? • How are dependencies across job files managed? For example, if theres multiple fact tables dependent on d_sample, how do I set up the fact job files? • Does the agent have to be restarted every time a new file flow is registered?
z
Hi @Ryan Kelly! You’ve touched on some open questions here — namely “How do I organize my Prefect project and register using CI/CD?” We’re still determining the best pattern for this as it varies depending on our users.
Is this an expected structure
Your project setup looks fine! I typically would start an agent with
prefect agent <type> start
rather than from Python but that’s not a big deal.
How are the new jobs supposed to be registered in CI?
I would write a python script that imports your flows and calls
flow.register
on each one. Personally, I’ve added a global list to my module and appended each flow to that then I iterate the list and call the register function. Then you just need to call this python script from your CI of choice.
How are dependencies across job files managed?
I don’t know what a fact table is 🙂 If the location where the agent is running has your module available on the python path (ie you did
pip install
or edited the
PYTHONPATH
env var) then when your flow runs it will be able to import it and be happy. You can also use a Docker agent / storage to ensure your flow’s requirements are packaged alongside it.
Does the agent have to be restarted every time a new flow is registered?
no 🙂 the agent queries the server for new flow runs and the server tells the agent where the flow is stored so it can pull it and then run it
🙌 1
r
Your project setup looks fine! I typically would start an agent with 
prefect agent <type> start
 rather than from Python but that’s not a big deal.
How is it possible to import jobs if the agent is launched via CLI?
I would write a python script that imports your flows and calls 
flow.register
 on each one. Personally, I’ve added a global list to my module and appended each flow to that then I iterate the list and call the register function. Then you just need to call this python script from your CI of choice.
This increments the version every time right? I was thinking in git pushes to have CI scripts get the files with changes and run them as main to register the flow. Seems similar to what youre saying.
I don’t know what a fact table is 🙂 If the location where the agent is running has your module available on the python path (ie you did 
pip install
 or edited the 
PYTHONPATH
 env var) then when your flow runs it will be able to import it and be happy
Easy example is… Dim table is products and Fact table is orders of products by product id. Dim is needed to flesh out info. Im not completely following here but using the above example it sounds like the file for d_products should be imported into f_orders and then f_orders should use StartRunFlow with
wait
set to
True
and set_upstream tied to d_products?
also thank you for your help on this @Zanie!
z
The agent launches your flow which imports the tasks it needs. It would increment the version every time, you could limit it to the master branch, specific git tags, or check git diffs of flow files before registering. One problem is that your tasks may be spread across several files. I don’t get your easy example 🙂 I’d recommend creating an example in code that shows your dependencies but with tasks that don’t actually do anything in a single file then paste it here if you have questions.
b
There's also a usecase where one wants to deploy a flow ad hoc. Not sure how do you do that. @Ryan Kelly I think an deployment diagrams to help describe how flow source are registered and how the runners are being used would definitely help. imo the "mechanics" of a flow lifecycle is not well documented.
z
“flow lifecycle” is on our docs todo list 🙂
upvote 1
🚀 3
d
This is a great discussion, thanks. On project structure and setup - is there any github link to refer to a sample project ? just wanted to understand how the entire source code is packaged to run different flows.
upvote 1
z
There isn’t yet — we have a private repo that we’re working on internally but it’s not ready.
d
ok cool, thanks
z
One option is setting up your code as a Python module, using docker storage, and installing your entire project in the docker image. Here’s a blurb from my code
Copy code
from typing import List
from prefect.storage.docker import Docker
from prefect_flows import PROJECT_PATH, PROJECT_NAME, config


def ProjectDockerStorage(project_extras: List[str] = None, **kwargs) -> Docker:
    """
    A thin wrapper around `prefect.storage.Docker` with

    - Installation of this project and any given extras
    - Handling for CI docker client/server setups

    Cannot be a class because then it is not a known serializable storage type so this
    is just an instance factory for Docker storage
    """

    # CircleCI -------------------------------------------------------------------------
    # Configure docker storage building for CI which tends to have a different base
    # url and require a TLS config. This may be fixed in Prefect upstream and could then
    # be removed.

    import docker

    default_client = docker.from_env()

    # This will break local runs which makes me think this *may* be CircleCI specific
    if default_client.api.base_url != "<http+docker://localhost>":
        kwargs.setdefault("base_url", default_client.api.base_url)

    kwargs.setdefault("tls_config", docker.TLSConfig(default_client.api.cert))

    # Project installation -------------------------------------------------------------

    # Copy this project into the docker image
    kwargs.setdefault("files", {})
    kwargs["files"][str(PROJECT_PATH)] = PROJECT_NAME

    # Install the project so it's on the Python path
    extras = ""
    if project_extras:
        extras = f"[{','.join(project_extras)}]"

    kwargs.setdefault("extra_dockerfile_commands", [])
    kwargs["extra_dockerfile_commands"].append(
        f"RUN pip install -e {PROJECT_NAME}{extras}"
    )

    kwargs.setdefault("registry_url", config.docker.registry_url)

    return Docker(**kwargs)
d
Awesome, this is super helpful. Thanks for sharing !