<@ULVA73B9P> I have a flow.py that has various tas...
# ask-marvin
d
@Marvin I have a flow.py that has various tasks and one main flow. Which then pulls images, runs container, starts container etc. On the step where we create_docker_container I want to make the command and environment fully customizable and override the default values defined in the flow.py: command=["bash", "-c", "./train.sh", "/data/datasets/data/traffic/data.pkl"], environment=[ "ENV1=$ENV1", "ENV2=$ENV2", ], to something like where I supply command and environment part dynamically on flow run.
m
To dynamically set task parameters like
command
and
environment
at runtime in Prefect, you can make use of flow parameters. This will allow you to override default values when creating flow runs. Here's a step-by-step guide on how to do this: 1. Define Parameters in Your Flow: Ensure your flow is set up to accept parameters:
Copy code
python
   from prefect import task, Flow
   from prefect.tasks.docker import CreateContainer

   @task
   def create_docker_container(command, environment):
       # Your docker container creation logic
       pass

   with Flow("my_flow") as flow:
       command = Parameter("command", default=["bash", "-c", "./train.sh", "/data/datasets/data.pkl"])
       environment = Parameter("environment", default=["ENV1=$ENV1", "ENV2=$ENV2"])
       create_docker_container(command, environment)
2. Creating Deployments: If you're using Prefect Deployment, you can set default parameter values and override them when needed. Create a `deployment.yaml`:
Copy code
yaml
   name: my_deployment
   flow_name: my_flow
   parameters:
     command: ["bash", "-c", "./train.sh", "/data/datasets/data.pkl"]
     environment: ["ENV1=$ENV1", "ENV2=$ENV2"]
3. Override Parameters on Flow Run: You can override these parameters at runtime using the Prefect UI or CLI: Using Prefect CLI:
Copy code
bash
   prefect deployment run my_flow/my_deployment -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Using Prefect UI: Navigate to the deployment's detail page, click Run, and then specify the new values for
command
and
environment
before triggering the flow run. Here are helpful resources from the Prefect documentation: - Flow Parameters Documentation - Deployments Documentation This approach ensures that you can dynamically set and override task parameters like
command
and
environment
at runtime, providing full customization for each flow run.
d
@Marvin in this deployment how do I specify the params: prefect deployment inspect pull-and-run-image/train_flow { 'id': '2eb6ef1b-76c4-4c36-9333-b1a17dc9c8d1', 'infra_overrides': {}, 'name': 'train_flow', 'version': None, 'description': None, 'flow_id': '2a1e74e2-4e93-457e-93cc-bce747e629c8', 'schedule': None, 'is_schedule_active': True, 'paused': False, 'schedules': [], 'parameters': {}, 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect'}}], 'work_queue_name': 'default', 'last_polled': None, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {}}, 'path': None, 'entrypoint': 'flow.py:pull_and_run_image', 'manifest_path': None, 'storage_document_id': None, 'infrastructure_document_id': None, 'created_by': None, 'updated_by': None, 'work_queue_id': None, 'enforce_parameter_schema': False, 'work_pool_name': 'seedoo-custom-worker', }
m
Your message was too long by 62 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
in this deployment how do I specify the params:
prefect deployment inspect pull-and-run-image/train_flow { 'id': '2eb6ef1b-76c4-4c36-9333-b1a17dc9c8d1', 'created': '2024-07-18T150056.777394+00:00', 'updated': '2024-07-18T150056.775200+00:00', 'infra_overrides': {}, 'name': 'train_flow', 'version': None, 'description': None, 'flow_id': '2a1e74e2-4e93-457e-93cc-bce747e629c8', 'schedule': None, 'is_schedule_active': True, 'paused': False, 'schedules': [], 'parameters': {}, 'pull_steps': [{'prefect.deployments.steps.set_working_directory': {'directory': '/opt/prefect'}}], 'tags': [], 'work_queue_name': 'default', 'last_polled': None, 'parameter_openapi_schema': {'title': 'Parameters', 'type': 'object', 'properties': {}}, 'path': None, 'entrypoint': 'flow.py:pull_and_run_image', 'manifest_path': None, 'storage_document_id': None
To specify parameters in your Prefect deployment, you need to include them in your
deployment.yaml
file. Based on the provided inspection output, here is how you can modify your
deployment.yaml
to include the parameters: 1. Modify the `deployment.yaml`: You need to add the
parameters
section with the default values you want to set for
command
and
environment
. Here’s an example:
Copy code
yaml
   name: train_flow
   flow_name: pull-and-run-image/train_flow
   parameters:
     command: ["bash", "-c", "./train.sh", "/data/datasets/data.pkl"]
     environment: ["ENV1=$ENV1", "ENV2=$ENV2"]
   work_queue_name: default
   work_pool_name: seedoo-custom-worker
2. Apply the Deployment: Once you have updated your
deployment.yaml
, you need to apply it using the Prefect CLI:
Copy code
bash
   prefect deployment apply deployment.yaml
3. Override Parameters at Runtime: You can override these parameters when you run the deployment using the Prefect CLI or UI. Using Prefect CLI:
Copy code
bash
   prefect deployment run pull-and-run-image/train_flow -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Using Prefect UI: - Navigate to the deployment’s detail page. - Click Run. - Specify the new values for
command
and
environment
before triggering the flow run. By following these steps, you can customize and override the
command
and
environment
parameters dynamically for each flow run. For more details, refer to the official Prefect documentation on deployments.
d
@Marvin here is my flow.py flow: import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container @flow async def pull_and_run_image(): logger = get_run_logger() docker_registry_credentials = await DockerRegistryCredentials.load("seedoo-docker-registry") image = await pull_image(docker_registry_credentials) container = await create_container() started_container = await start_container(container.id) await asyncio.sleep(20) # Add a 10 second sleep container_logs = await get_docker_container_logs_flow(container.id) if name == "__main__": asyncio.run(pull_and_run_image())
m
Your message was too long by 571 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
here is my flow.py flow:
import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() # Pull the latest image from the private repository logger.info("Pulling the latest image from the private repository...") image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) logger.info(f"Pulled image: {image}") return image @task async def create_container(): logger = get_run_logger() # Create a container with the specified flags logger.info("Creating a container with the specified flags...") container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", "NUMBA_THREADING_LAYER=$NUMBA_THREADING_LAYER", "OMP
Your message was too long by 172 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
here is my flow.py flow:
import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", "NUMBA_THREADING_LAYER=$NUMBA_THREADING_LAYER", "OMP_NUM_THREADS=$OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS=$OPENBLAS_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], device_requests=[ { 'Driver': 'nvidia', 'Count': -1, 'Capabilities': [['gpu']], }, ], group_add=["seedoo"],
Your message was too long by 75 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
here is my flow.py flow:
import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container @flow async def pull_and_run_image(): logger = get_run_logger() docker_registry_credentials = await DockerRegistryCredentials.load("seedoo-docker-registry")
d
> @Marvin here is my flow.py flow: import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container
m
Your message was too long by 3 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
> here is my flow.py flow:
import asyncio from prefect import flow, task, get_run_logger from prefect_docker.images import pull_docker_image from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs from prefect_docker.credentials import DockerRegistryCredentials @task async def pull_image(credentials): logger = get_run_logger() image = await pull_docker_image( repository="seedooinsights/build", tag="latest", docker_registry_credentials=credentials ) return image @task async def create_container(): logger = get_run_logger() container = await create_docker_container( image="seedooinsights/build:latest", command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"], environment=[ "MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS", ], volumes=[ "/seedoodata:/seedoodata", ], ) return container @task async def start_container(container_id): logger = get_run_logger() started_container = await start_docker_container(container_id=container_id) return started_container @flow async def pull_and_run_image(): logger = get_run_logger() docker_registry_credentials = await DockerRegistryCredentials.load("seedoo-docker
To allow dynamic customization of the
command
and
environment
parameters in your
flow.py
, you should define these as parameters in your flow and then pass them to your tasks. Here is the modified version of your `flow.py`:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger, Parameter
from prefect_docker.images import pull_docker_image
from prefect_docker.containers import create_docker_container, start_docker_container, get_docker_container_logs
from prefect_docker.credentials import DockerRegistryCredentials

@task
async def pull_image(credentials):
    logger = get_run_logger()
    image = await pull_docker_image(
        repository="seedooinsights/build",
        tag="latest",
        docker_registry_credentials=credentials
    )
    return image

@task
async def create_container(image, command, environment):
    logger = get_run_logger()
    container = await create_docker_container(
        image=image,
        command=command,
        environment=environment,
        volumes=[
            "/seedoodata:/seedoodata",
        ],
    )
    return container

@task
async def start_container(container_id):
    logger = get_run_logger()
    started_container = await start_docker_container(container_id=container_id)
    return started_container

@flow
async def train_flow(command=["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"],
                     environment=["MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS"]):
    credentials = DockerRegistryCredentials.load("my-registry-creds")
    image = await pull_image(credentials)
    container = await create_container(image, command, environment)
    started_container = await start_container(container)
    logs = await get_docker_container_logs(container_id=started_container.id)
    return logs
Now, update your
deployment.yaml
to include default parameters:
Copy code
yaml
name: train_flow
flow_name: pull-and-run-image/train_flow
parameters:
  command: ["bash", "-c", "./train.sh", "/seedoodata/datasets/data/traffic_lights/data.pkl"]
  environment: ["MKL_NUM_THREADS=$MKL_NUM_THREADS", "NUMBA_NUM_THREADS=$NUMBA_NUM_THREADS"]
work_queue_name: default
work_pool_name: seedoo-custom-worker
Apply the deployment using:
Copy code
bash
prefect deployment apply deployment.yaml
To override these parameters at runtime:
Copy code
bash
prefect deployment run pull-and-run-image/train_flow -p 'command=["bash", "-c", "new_command.sh", "/new/path/data.pkl"]' -p 'environment=["NEW_ENV1=value1", "NEW_ENV2=value2"]'
Or via the Prefect UI by specifying the new values before triggering the flow run. This setup ensures the
command
and
environment
parameters are customizable at runtime.
d
@Marvin help?