Thomas La Piana
04/29/2020, 12:48 PMScott Zelenka
04/29/2020, 12:50 PMfrom prefect import task, Flow
from prefect.environments import RemoteEnvironment
from prefect.environments.storage.docker import Docker
# assumes you have your auth token stored in ~/.prefect/config.toml
@task
def hello():
print("World!")
with Flow("hello_cae") as f:
hello()
f.storage = Docker()
f.environment = RemoteEnvironment()
f.register(build=True)
f.register
is what tells the scheduler about the Flow, and the f.storage
is where you're storing the serialized flowJeremiah
04/29/2020, 12:58 PMbuild
your flow into a supplied Storage
object (In Scott’s example, a Docker container for which you can provide a custom registry/tag), and also an Environment
object that tells Prefect how to load from that storage (deploy into kubernetes, run with local docker daemon, run in memory, etc. etc. etc). Then, when you register
the flow, that storage/environment information gets serialized and the metadata is attached to the flow. An agent
, upon receiving the instruction to run that flow, now has all the information it requires to 1) load it from storage and 2) execute its environment, thereby reconstructing it from bytes.
One of my wishlist items is a GitHubStorage
🙂Thomas La Piana
04/29/2020, 1:39 PMPedro Machado
04/29/2020, 2:25 PMScott Zelenka
04/29/2020, 3:47 PM.register
to Prefect Cloud upon Git PR to master.Thomas La Piana
04/29/2020, 3:48 PMJoe Schmid
04/30/2020, 3:53 PMJeremiah
04/30/2020, 4:50 PMThomas La Piana
05/02/2020, 3:36 AMJoe Schmid
05/04/2020, 1:25 PMdev-ci-13217
)
4. Only re-register Flows that have changed.
In our early attempts without step 4, it was annoying that a Flow that hadn't changed would get a new version, easy viewing of recent task runs would get reset, etc.git diff-tree --no-commit-id --name-only -r `meta get build.sha` | docker run -e PREFECT__CLOUD__AGENT__AUTH_TOKEN=${PREFECT__CLOUD__AGENT__AUTH_TOKEN} -e BUILD_TAG=${BUILD_TAG} -i ${PREFECT_FLOWS_IMAGE}:${BUILD_TAG} bash -c 'prefect auth login -t $PREFECT__CLOUD__AGENT__AUTH_TOKEN ; python -u -m srm.datasci.lib.flow.register_flows'
This gets the list of files changed in the current commit (should tweak this to get files changed since the commit of the last successful build...) then runs the docker image we just built.import subprocess
import sys
FLOWS_DIRECTORY = "src/srm/datasci/flows/"
print("Looking for Flows to register in the list of changed files for this commit...")
for line in sys.stdin:
if line.startswith(FLOWS_DIRECTORY) and "__init__.py" not in line:
flow_module = "{}".format(line[4:-4].replace("/", "."))
command = ["python", "-u", "-m", flow_module, "register"]
print("Running: {} {} {} {} {}".format(*command))
result = subprocess.run(command, stdout=subprocess.PIPE)
print(result.stdout.decode("utf-8"))
This just looks for files that changed in our flows directory, changes file paths to python modules and then runs "register" for each. We use a convention where every flow must be able to be run from command line and accept 1 argument for register
.Thomas La Piana
05/04/2020, 2:40 PM