Thomas La Piana

    Thomas La Piana

    2 years ago
    would anyone be willing to give me some insight into how they productionalize their flows? I've got a flow written and I've gotten Prefect server up and running in k8s, but how do i go about getting my local flows to execute on the k8s agent? more in the thread
    should i be using some kind of git-sync to get the flows i write to the agent pod/container?
    Scott Zelenka

    Scott Zelenka

    2 years ago
    from your local machine, you need to serialize the flow and store it somewhere. Then you notify the Server that the flow exists, and to orchestrate it. This is a super basic example:
    from prefect import task, Flow
    from prefect.environments import RemoteEnvironment
    from prefect.environments.storage.docker import Docker
    
    # assumes you have your auth token stored in ~/.prefect/config.toml
    
    @task
    def hello():
        print("World!")
    
    
    with Flow("hello_cae") as f:
        hello()
    
    
    f.storage = Docker()
    
    f.environment = RemoteEnvironment()
    
    f.register(build=True)
    the
    f.register
    is what tells the scheduler about the Flow, and the
    f.storage
    is where you're storing the serialized flow
    Jeremiah

    Jeremiah

    2 years ago
    ^ to echo Scott, Prefect’s framework differs from Airflow in that there’s no need to re-parse the “DAGfile” to run a task. Instead, you
    build
    your flow into a supplied
    Storage
    object (In Scott’s example, a Docker container for which you can provide a custom registry/tag), and also an
    Environment
    object that tells Prefect how to load from that storage (deploy into kubernetes, run with local docker daemon, run in memory, etc. etc. etc). Then, when you
    register
    the flow, that storage/environment information gets serialized and the metadata is attached to the flow. An
    agent
    , upon receiving the instruction to run that flow, now has all the information it requires to 1) load it from storage and 2) execute its environment, thereby reconstructing it from bytes. One of my wishlist items is a
    GitHubStorage
    🙂
    Thomas La Piana

    Thomas La Piana

    2 years ago
    thanks so much, that makes perfect sense! was having trouble getting the mental model down but thats a much better deployment story than having to sync a DAGs folder 🙂
    Pedro Machado

    Pedro Machado

    2 years ago
    Also interested in this topic. So do you just run your Python script manually? If you have multiple flows, do you run multiple scripts? How would you implement a CI/CD pipeline? Thanks!
    Scott Zelenka

    Scott Zelenka

    2 years ago
    @Pedro Machado I'm going through CI/CD right now via Jenkins. My plan is to have Jenkins perform the serialization and
    .register
    to Prefect Cloud upon Git PR to master.
    Thomas La Piana

    Thomas La Piana

    2 years ago
    im going with same route as @Scott Zelenka, going to have github actions run a script that does the registration
    j

    Joe Schmid

    2 years ago
    @Thomas La Piana & @Scott Zelenka We're doing some work on CI/CD right now. Are you planning to "package all Flows into one container & register them all on every build" or "1 Flow per container & only register that Flow"?
    (I feel like there are some Flow-specific CI/CD best practices for us as the community to collaborate on and document.)
    Jeremiah

    Jeremiah

    2 years ago
    I’d value learning more about how people like to do this as well!
    Thomas La Piana

    Thomas La Piana

    2 years ago
    i'm doing the same as scott. i have a python script that just builds and registers a flow, that flow's only task is to execute a flow that is already inside of another docker container
    @Joe Schmid im going the 1 flow per container route
    j

    Joe Schmid

    2 years ago
    Here's our plan for CI/CD (evolving in real time):1. Have a single repo with library code (our own Python module) and Flows. 2. Package all Flows into a single docker image. (After 4 years with Airflow we now have hundreds of DAGs so managing that many images seems unwieldy when the only difference between them might be the serialized .flow file itself. 3. Tag the image and git commit with a build event ID label (e.g.
    dev-ci-13217
    ) 4. Only re-register Flows that have changed. In our early attempts without step 4, it was annoying that a Flow that hadn't changed would get a new version, easy viewing of recent task runs would get reset, etc.
    For step 4, we use something like:
    git diff-tree --no-commit-id --name-only -r `meta get build.sha` | docker run -e PREFECT__CLOUD__AGENT__AUTH_TOKEN=${PREFECT__CLOUD__AGENT__AUTH_TOKEN} -e BUILD_TAG=${BUILD_TAG} -i ${PREFECT_FLOWS_IMAGE}:${BUILD_TAG} bash -c 'prefect auth login -t $PREFECT__CLOUD__AGENT__AUTH_TOKEN ; python -u -m srm.datasci.lib.flow.register_flows'
    This gets the list of files changed in the current commit (should tweak this to get files changed since the commit of the last successful build...) then runs the docker image we just built.
    register_flows.py looks like:
    import subprocess
    import sys
    
    FLOWS_DIRECTORY = "src/srm/datasci/flows/"
    print("Looking for Flows to register in the list of changed files for this commit...")
    for line in sys.stdin:
        if line.startswith(FLOWS_DIRECTORY) and "__init__.py" not in line:
            flow_module = "{}".format(line[4:-4].replace("/", "."))
            command = ["python", "-u", "-m", flow_module, "register"]
            print("Running: {} {} {} {} {}".format(*command))
            result = subprocess.run(command, stdout=subprocess.PIPE)
            print(result.stdout.decode("utf-8"))
    This just looks for files that changed in our flows directory, changes file paths to python modules and then runs "register" for each. We use a convention where every flow must be able to be run from command line and accept 1 argument for
    register
    .
    Thomas La Piana

    Thomas La Piana

    2 years ago
    interesting, thats a nice way to do it! i''m still toying with getting dask gateway to work as a custom environment