Carmen Marcos
02/07/2022, 10:16 AMAnna Geller
case
task, as shown here:
from prefect import Flow, task, case
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.run_configs import DockerRun
@task
def check_the_infrastructure():
return "Docker with label machine-1"
with Flow("parent_flow") as flow:
infra = check_the_infrastructure()
with case(infra, "Docker with label machine-1"):
child_flow_run_id = create_flow_run(
flow_name="child_flow_name",
run_config=DockerRun(
labels=["machine-1"]
), # with a specific condition like image
)
k8_child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
with case(infra, "Docker with label machine-2"):
child_flow_run_id = create_flow_run(
flow_name="child_flow_name",
run_config=DockerRun(
labels=["machine-2"]
), # with another different condition like image, label etc.
)
docker_child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
Carmen Marcos
02/07/2022, 12:16 PMAnna Geller
config.toml
on all agent instances: https://discourse.prefect.io/t/how-to-deploy-self-hosted-server-and-register-first-flows-to-the-server-backend/85
Also, that's correct - Prefect must be installed on all machines, otherwise you can't start an agent there. But they don't necessarily need to have the same configuration - they may be entirely different types of instances (one smaller one and another with much more CPU and memory) and they may (and should!) have different agent labels.
Regarding the Docker image, it's best to push it to a remote container registry e.g. DockerHub, AWS ECR, GCP GCR, Azure AKS, etc. This way, when you change the image to include some new dependencies over time, your flows will be able to pull the new image without having to rewrite or redeploy your flow. The image can also be set dynamically on the run config (e.g. DockerRun) so that you may even override the default image for a specific flow at runtime.