Dijar Vrella
07/18/2024, 3:40 PMMarvin
07/18/2024, 3:40 PMcommand
and environment
at runtime in Prefect, you can make use of flow parameters. This will allow you to override default values when creating flow runs.
Here's a step-by-step guide on how to do this:
1. Define Parameters in Your Flow:
Ensure your flow is set up to accept parameters:
python
from prefect import task, Flow
from prefect.tasks.docker import CreateContainer
@task
def create_docker_container(command, environment):
# Your docker container creation logic
pass
with Flow("my_flow") as flow:
command = Parameter("command", default=["bash", "-c", "./train.sh", "/data/datasets/data.pkl"])
environment = Parameter("environment", default=["ENV1=$ENV1", "ENV2=$ENV2"])
create_docker_container(command, environment)
2. Creating Deployments:
If you're using Prefect Deployment, you can set default parameter values and override them when needed.
Create a `deployment.yaml`:
yaml
name: my_deployment
flow_name: my_flow
parameters:
command: ["bash", "-c", "./train.sh", "/data/datasets/data.pkl"]
environment: ["ENV1=$ENV1", "ENV2=$ENV2"]
3. Override Parameters on Flow Run:
You can override these parameters at runtime using the Prefect UI or CLI:
Using Prefect CLI:
bash
prefect deployment run my_flow/my_deployment -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Using Prefect UI:
Navigate to the deployment's detail page, click Run, and then specify the new values for command
and environment
before triggering the flow run.
Here are helpful resources from the Prefect documentation:
- Flow Parameters Documentation
- Deployments Documentation
This approach ensures that you can dynamically set and override task parameters like command
and environment
at runtime, providing full customization for each flow run.Dijar Vrella
07/18/2024, 4:07 PMMarvin
07/18/2024, 4:07 PMin this deployment how do I specify the params:prefect deployment inspect pull-and-run-image/train_flow { 'id': '2eb6ef1b-76c4-4c36-9333-b1a17dc9c8d1', 'created': '2024-07-18T150056.777394+00:00', 'updated': '2024-07-18T150056.775200+00:00', 'infra_overrides': {}, 'name': 'train_flow', 'version': None, 'description': None, 'flow_id': '2a1e74e2-4e93-457e-93cc-bce747e629c8', 'schedule': None, 'is_schedule_active': True, 'paused': False, 'schedules': [], 'parameters': {}, 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect'}}], 'tags': [], 'work_queue_name': 'default', 'last_polled': None, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {}}, 'path': None, 'entrypoint': 'flow.py:pull_and_run_image', 'manifest_path': None, 'storage_document_id': None
Marvin
07/18/2024, 4:08 PMdeployment.yaml
file. Based on the provided inspection output, here is how you can modify your deployment.yaml
to include the parameters:
1. Modify the `deployment.yaml`:
You need to add the parameters
section with the default values you want to set for command
and environment
. Here’s an example:
yaml
name: train_flow
flow_name: pull-and-run-image/train_flow
parameters:
command: ["bash", "-c", "./train.sh", "/data/datasets/data.pkl"]
environment: ["ENV1=$ENV1", "ENV2=$ENV2"]
work_queue_name: default
work_pool_name: seedoo-custom-worker
2. Apply the Deployment:
Once you have updated your deployment.yaml
, you need to apply it using the Prefect CLI:
bash
prefect deployment apply deployment.yaml
3. Override Parameters at Runtime:
You can override these parameters when you run the deployment using the Prefect CLI or UI.
Using Prefect CLI:
bash
prefect deployment run pull-and-run-image/train_flow -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Using Prefect UI:
- Navigate to the deployment’s detail page.
- Click Run.
- Specify the new values for command
and environment
before triggering the flow run.
By following these steps, you can customize and override the command
and environment
parameters dynamically for each flow run.
For more details, refer to the official Prefect documentation on deployments.Dijar Vrella
07/18/2024, 4:09 PMMarvin
07/18/2024, 4:09 PMhere is my flow.py flow:import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() # Pull the latest image from the private repository logger.info("Pulling the latest image from the private repository...") image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) logger.info(f"Pulled image: {image}") return image @task async def create_container(): logger = get_run_logger() # Create a container with the specified flags logger.info("Creating a container with the specified flags...") container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", "NUMBA_THREADING_LAYER=$NUMBA_THREADING_LAYER", "OMP
Marvin
07/18/2024, 4:11 PMhere is my flow.py flow:import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", "NUMBA_THREADING_LAYER=$NUMBA_THREADING_LAYER", "OMP_NUM_THREADS=$OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS=$OPENBLAS_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], device_requests=[ { 'Driver': 'nvidia', 'Count': -1, 'Capabilities': [['gpu']], }, ], group_add=["seedoo"],
Marvin
07/18/2024, 4:12 PMhere is my flow.py flow:import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container @flow async def pull_and_run_image(): logger = get_run_logger() docker_registry_credentials = await DockerRegistryCredentials.load("seedoo-docker-registry")
Dijar Vrella
07/18/2024, 4:12 PMMarvin
07/18/2024, 4:12 PM> here is my flow.py flow:import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container @flow async def pull_and_run_image(): logger = get_run_logger() docker_registry_credentials = await DockerRegistryCredentials.load("seedoo-docker
Marvin
07/18/2024, 4:13 PMcommand
and environment
parameters in your flow.py
, you should define these as parameters in your flow and then pass them to your tasks. Here is the modified version of your `flow.py`:
python
import asyncio
from prefect import flow, task, get_run_logger, Parameter
from prefect_docker.images import pull_docker_image
from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs
from prefect_docker.credentials import DockerRegistryCredentials
@task
async def pull_image(credentials):
logger = get_run_logger()
image = await pull_docker_image(
repository="seedooinsights/build",
tag="latest",
docker_registry_credentials=credentials
)
return image
@task
async def create_container(image, command, environment):
logger = get_run_logger()
container = await create_docker_container(
image=image,
command=command,
environment=environment,
volumes=[
"/seedoodata:/seedoodata",
],
)
return container
@task
async def start_container(container_id):
logger = get_run_logger()
started_container = await start_docker_container(container_id=container_id)
return started_container
@flow
async def train_flow(command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"],
environment=["MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS"]):
credentials = DockerRegistryCredentials.load("my-registry-creds")
image = await pull_image(credentials)
container = await create_container(image, command, environment)
started_container = await start_container(container)
logs = await get_docker_container_logs(container_id=started_container.id)
return logs
Now, update your deployment.yaml
to include default parameters:
yaml
name: train_flow
flow_name: pull-and-run-image/train_flow
parameters:
command: ["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"]
environment: ["MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS"]
work_queue_name: default
work_pool_name: seedoo-custom-worker
Apply the deployment using:
bash
prefect deployment apply deployment.yaml
To override these parameters at runtime:
bash
prefect deployment run pull-and-run-image/train_flow -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Or via the Prefect UI by specifying the new values before triggering the flow run. This setup ensures the command
and environment
parameters are customizable at runtime.Dijar Vrella
07/20/2024, 11:59 AM