https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Thomas La Piana

04/29/2020, 12:48 PM
would anyone be willing to give me some insight into how they productionalize their flows? I've got a flow written and I've gotten Prefect server up and running in k8s, but how do i go about getting my local flows to execute on the k8s agent? more in the thread
should i be using some kind of git-sync to get the flows i write to the agent pod/container?
s

Scott Zelenka

04/29/2020, 12:50 PM
from your local machine, you need to serialize the flow and store it somewhere. Then you notify the Server that the flow exists, and to orchestrate it. This is a super basic example:
Copy code
from prefect import task, Flow
from prefect.environments import RemoteEnvironment
from prefect.environments.storage.docker import Docker

# assumes you have your auth token stored in ~/.prefect/config.toml

@task
def hello():
    print("World!")


with Flow("hello_cae") as f:
    hello()


f.storage = Docker()

f.environment = RemoteEnvironment()

f.register(build=True)
the
f.register
is what tells the scheduler about the Flow, and the
f.storage
is where you're storing the serialized flow
j

Jeremiah

04/29/2020, 12:58 PM
^ to echo Scott, Prefect’s framework differs from Airflow in that there’s no need to re-parse the “DAGfile” to run a task. Instead, you
build
your flow into a supplied
Storage
object (In Scott’s example, a Docker container for which you can provide a custom registry/tag), and also an
Environment
object that tells Prefect how to load from that storage (deploy into kubernetes, run with local docker daemon, run in memory, etc. etc. etc). Then, when you
register
the flow, that storage/environment information gets serialized and the metadata is attached to the flow. An
agent
, upon receiving the instruction to run that flow, now has all the information it requires to 1) load it from storage and 2) execute its environment, thereby reconstructing it from bytes. One of my wishlist items is a
GitHubStorage
🙂
t

Thomas La Piana

04/29/2020, 1:39 PM
thanks so much, that makes perfect sense! was having trouble getting the mental model down but thats a much better deployment story than having to sync a DAGs folder 🙂
👍 2
p

Pedro Machado

04/29/2020, 2:25 PM
Also interested in this topic. So do you just run your Python script manually? If you have multiple flows, do you run multiple scripts? How would you implement a CI/CD pipeline? Thanks!
s

Scott Zelenka

04/29/2020, 3:47 PM
@Pedro Machado I'm going through CI/CD right now via Jenkins. My plan is to have Jenkins perform the serialization and
.register
to Prefect Cloud upon Git PR to master.
t

Thomas La Piana

04/29/2020, 3:48 PM
im going with same route as @Scott Zelenka, going to have github actions run a script that does the registration
j

Joe Schmid

04/30/2020, 3:53 PM
@Thomas La Piana & @Scott Zelenka We're doing some work on CI/CD right now. Are you planning to "package all Flows into one container & register them all on every build" or "1 Flow per container & only register that Flow"?
(I feel like there are some Flow-specific CI/CD best practices for us as the community to collaborate on and document.)
👍 1
j

Jeremiah

04/30/2020, 4:50 PM
I’d value learning more about how people like to do this as well!
👍 1
t

Thomas La Piana

05/02/2020, 3:36 AM
i'm doing the same as scott. i have a python script that just builds and registers a flow, that flow's only task is to execute a flow that is already inside of another docker container
@Joe Schmid im going the 1 flow per container route
j

Joe Schmid

05/04/2020, 1:25 PM
Here's our plan for CI/CD (evolving in real time): 1. Have a single repo with library code (our own Python module) and Flows. 2. Package all Flows into a single docker image. (After 4 years with Airflow we now have hundreds of DAGs so managing that many images seems unwieldy when the only difference between them might be the serialized .flow file itself. 3. Tag the image and git commit with a build event ID label (e.g.
dev-ci-13217
) 4. Only re-register Flows that have changed. In our early attempts without step 4, it was annoying that a Flow that hadn't changed would get a new version, easy viewing of recent task runs would get reset, etc.
For step 4, we use something like:
Copy code
git diff-tree --no-commit-id --name-only -r `meta get build.sha` | docker run -e PREFECT__CLOUD__AGENT__AUTH_TOKEN=${PREFECT__CLOUD__AGENT__AUTH_TOKEN} -e BUILD_TAG=${BUILD_TAG} -i ${PREFECT_FLOWS_IMAGE}:${BUILD_TAG} bash -c 'prefect auth login -t $PREFECT__CLOUD__AGENT__AUTH_TOKEN ; python -u -m srm.datasci.lib.flow.register_flows'
This gets the list of files changed in the current commit (should tweak this to get files changed since the commit of the last successful build...) then runs the docker image we just built.
register_flows.py looks like:
Copy code
import subprocess
import sys

FLOWS_DIRECTORY = "src/srm/datasci/flows/"
print("Looking for Flows to register in the list of changed files for this commit...")
for line in sys.stdin:
    if line.startswith(FLOWS_DIRECTORY) and "__init__.py" not in line:
        flow_module = "{}".format(line[4:-4].replace("/", "."))
        command = ["python", "-u", "-m", flow_module, "register"]
        print("Running: {} {} {} {} {}".format(*command))
        result = subprocess.run(command, stdout=subprocess.PIPE)
        print(result.stdout.decode("utf-8"))
This just looks for files that changed in our flows directory, changes file paths to python modules and then runs "register" for each. We use a convention where every flow must be able to be run from command line and accept 1 argument for
register
.
t

Thomas La Piana

05/04/2020, 2:40 PM
interesting, thats a nice way to do it! i''m still toying with getting dask gateway to work as a custom environment