Hello everyone, I am new to Prefect, and I am tryi...
# ask-community
e
Hello everyone, I am new to Prefect, and I am trying to make it fit what I want to do, but I find it very hard to find documentation or a video. I have two docker images in a private repo: •
project-data
: it is the project that stores the python scripts to extract, load data. It has a docker image. •
project-orchestrator
: it is the project containing the tasks and flows. It is running commands on docker images (like the docker image of
project-data
Does anyone have this kind of setup? Can this kind of setup work?
n
hi @Eric - welcome! i'll give some context that might help > it is the project that stores the python scripts to extract, load data usually these scripts are instrumented with flow and task decorators so that you can get retries / caching etc for your ETL work, but they certainly don't have to be > running commands on docker images this is pretty common. you can either directly write some python, perhaps like
Copy code
@task
def manage_command(command: str, **kwargs): ... # babysit some docker run command from a parent process
or instead you can make each ETL script its own prefect "deployment" so that you could write code like this
Copy code
run_deployment("foo-script/deployment", parameters=dict(...)) # instead of `docker run -- whatever command`
where a docker worker would listen to the server for scheduled runs of this deployment (e.g. calling
run_deployment
) and then spin up a container to run that specific script

https://www.youtube.com/watch?v=KzumEnsLOe0

let me know if you have further questions!
e
Hi @Nate, thanks for the reply. I watched the part 2 previously, but I didn't get the answer I was looking for. In the video, you use the same project to have your python scripts, create tasks and flows, and also deploy your flows. In my python scripts in
project-data
I am not using any Prefect decorator (
@flow
or
@task
). I wanted to separate the ELT related scripts logic and the orchestration logic (in
project-orchestration
. Should I do things that way when I use Prefect? I have a
@task
with a python function where i use
subprocess
, is it the way to go? As the docker image is in a private repo, I don't understand i need to fill the credentials of the private docker repo? In the block
Docker Registry Credentials
, I see this code:
Copy code
from prefect_docker import DockerHost, DockerRegistryCredentials

docker_host = DockerHost()
docker_registry_credentials = DockerRegistryCredentials(
    username="my_username",
    password="my_password",
    registry_url="registry.hub.docker.com",
)
with docker_host.get_client() as client:
    docker_registry_credentials.login(client)
Where am I suppose to use that? How to deal with private docker repository? I also don't get how you start a worker for
docker
work pool. I am currently using
Coolify
and I added this service in the docker compose of Coolify:
Copy code
agent:
    image: "prefecthq/prefect:3-python3.13"
    depends_on:
      prefect:
        condition: service_healthy
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    entrypoint:
      - /opt/prefect/entrypoint.sh
      - prefect
      - worker
      - start
      - "--pool=${DEFAULT_POOL_NAME}"
      - "--type=docker
      - "--with-healthcheck"
      - "--name=${DEFAULT_WORKER_NAME}"
      - "--limit=${DEFAULT_POOL_LIMIT}"
    environment:
      - "PREFECT_API_URL=<http://prefect:4200/api>"
      - "PREFECT_API_KEY=${SERVICE_PASSWORD_APIKEY}"
      - "DEFAULT_POOL_NAME=${DEFAULT_POOL_NAME:-default}"
      - "DEFAULT_POOL_LIMIT=${DEFAULT_POOL_LIMIT:-1}"
      - "DEFAULT_WORKER_NAME=${DEFAULT_WORKER_NAME:-worker1}"
    healthcheck:
      test:
        - CMD-SHELL
        - pwd
      interval: 5s
      timeout: 5s
      retries: 3
Then this service was not running. Do we need to have our laptop turned on 24/7 to run flows?
maybe to illustrate a task I created:
Copy code
@task(
    retries=2,
    retry_delay_seconds=30,
)
def extract_data(
    kwargs: Optional[str] = None,
) -> dict:
    logger = get_run_logger()

    # Create secure env file (no secrets in command line)
    env_file = get_docker_env_file()

    try:
        cmd = [
            "docker",
            "run",
            "--rm",
            "--pull=always",
            "--env-file",
            env_file,
            get_docker_image(),
            "python",
            "-m",
            "scripts.fetch_data",
        ]

        # Add optional kwargs
        if kwargs:
            cmd.extend(["--kwargs", kwargs])

        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
        return {
            "status": "success",
            "stdout": result.stdout,
            "stderr": result.stderr,
        }
    except subprocess.CalledProcessError as e:
        logger.error(f"Failed to extract data: {e}")
        raise
    finally:
        # Clean up temporary env file
        try:
            os.unlink(env_file)
        except OSError:
            pass
Is it the way you were seeing things when talking about
babysit
? To use the
project-data
docker image in this script, How to login to the private repo here? Do I need to login in my VM?
n
> In the video, you use the same project to have your python scripts, create tasks and flows, and also deploy your flows. its true but it was circumstantial and just the simplest thing for illustration. it shouldn't really matter where your code lives essentially it sounds like you want something we've historically called the "orchestrator pattern" where you have a parent process (flow or just a normal python script) that kicks off independent pieces of work in another process (as subprocesses, or containers in your case) and so that's what I was suggesting with the
run_deployment
, ie define a deployment called something like
run_script
that's associated with a docker work pool (you can put your registry creds on this work pool) so that your code that happens to be in
project-orchestrator
can trigger a run of your script on that isolated container specified by your deployment's docker work pool, and then the orchestrator flow can babysit all the script containers and maybe do some logic to report on the results or do downstream things. however if you don't need the full customization a work pool offers, the easiest approach sounds like
Copy code
# orchestrator project
@task
def babysit_container(command: str, image: str):
  # use subprocess or docker sdk to send command to container like you're doing now

@flow
def babysit_containers(commands: list[str]):
   babysit_container.map(commands, 'foo/bar:baz').result() # spins up all containers concurrently
so this is fine, but i'd say you're leaving the convenience of the docker work pool on the table with this approach, so if you wanted the
run_deployment
approach you'd just have to change
babysit_container
task to call
run_deployment
instead of
subprocess.run
directly --- > I also don't get how you start a worker for
docker
work pool
prefect worker start --type docker --pool my-new-docker-pool
> Do we need to have our laptop turned on 24/7 to run flows? if you want to schedule deployments, you need a worker listening for scheduled runs that will dispatch your deployment container according to your configured work pool. if you want the approach where you call
subprocess.run
to use the docker API/cli yourself, then you don't need a worker process alive 24/7 --- > In my python scripts in
project-data
I am not using any Prefect decorator (
@flow
or
@task
). I wanted to separate the ELT related scripts logic and the orchestration logic (in
project-orchestration
. Should I do things that way when I use Prefect? typically people use prefect to give their ETL scripts features like retries, caching, and concurrent execution, which require the decorators, but there's no requirement that you do this --- so that's a lot of info, but based on what you've said I'd recommend the following • make a docker registry credentials block like you mentioned above, use that to get an authed docker client in your orchestrator flow • use that client in your
babysit_container
task to trigger your script on a container as it sounds like you have been (
client.containers.run
instead of using
subprocess.run
to talk to the CLI) • for later: consider making deployments for your ETL scripts individually so that you can use
run_deployment
and very simply configure your docker infra for the ETL scripts, independently of the parent orchestrator script
e
Thanks @Nate for the long reply 🙏 I will check now to see how it goes, and keep you updated
About the
docker
worker listening to the
docker
work pool, I didn’t get what is running. This service in the docker compose or something else?
Copy code
agent:
    image: "prefecthq/prefect:3-python3.13"
    depends_on:
      prefect:
        condition: service_healthy
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    entrypoint:
      - /opt/prefect/entrypoint.sh
      - prefect
      - worker
      - start
      - "--pool=${DEFAULT_POOL_NAME}"
      - "--type=docker
      - "--with-healthcheck"
      - "--name=${DEFAULT_WORKER_NAME}"
      - "--limit=${DEFAULT_POOL_LIMIT}"
    environment:
      - "PREFECT_API_URL=<http://prefect:4200/api>"
      - "PREFECT_API_KEY=${SERVICE_PASSWORD_APIKEY}"
      - "DEFAULT_POOL_NAME=${DEFAULT_POOL_NAME:-default}"
      - "DEFAULT_POOL_LIMIT=${DEFAULT_POOL_LIMIT:-1}"
      - "DEFAULT_WORKER_NAME=${DEFAULT_WORKER_NAME:-worker1}"
    healthcheck:
      test:
        - CMD-SHELL
        - pwd
      interval: 5s
      timeout: 5s
      retries: 3
I don’t get what is a worker physically? What differs a `docker`worker to a
process
worker? In Coolify, we just have by default a process work pool.
n
a worker is just a daemon/long-lived process that polls the prefect server (ie scheduler) to see if there's work that should be submitted one way or another a run is scheduled on the server, and every so often the worker asks the server "any more runs from the work pool im listening to?" if yes, then it submits work the only way it knows how per its
type
docker workers submit flow runs as docker containers, process workers submit flow runs as subprocesses tldr: workers poll for work and create the runtime infra for the flow run according to the work pool it listens to
👍 1
e
Hi @Nate, thanks for all your help. I succeeded to run a full flow with the architecture where the orchestration docker image babysit the data docker image. It was not obvious though as I needed to modify the default settings of the Docker Work Pool: •
Volumes
to have access to docker by providing the socket mapping • `Networks`: add at least the network where Prefect UI/API is (or worker, i am not sure, but they are in the same network in my docker compose) Also I needed to have a custom docker compose in order to run a
docker
worker, compared to the
process
worker docker compose one.