https://prefect.io logo
Title
v

VildMedPap

09/30/2021, 1:30 PM
Hi Community 👋🏼 Me and my pal @Claus Vestergaard are facing a Docker challenge with Prefect. We have searched and read multiple questions and answers in this Slack team but we are still confused if our intended use case is applicable with Prefect. Case We have a Docker image with python code which process a csv file and return a json file (this is done through bind mounting)
docker build my_processor .
docker run -v ${PWD}/data:/app/data my_processor
Here the raw data (the csv file) is located in /data/raw and the processed file (the json file) will be located by the container in /data/processed. What we wish to achieve - 3 tasks - Task #1) Fetch data from source (S3) and temporarily store it in /data/raw - Task #2) Run the container with bind mount to /data directory - Task #3) Push data from /data/processed to destination (S3) Our setup - The prefect server is hosted locally on an EC2 server Dockerfile used to build the image
FROM python:3.7-slim
WORKDIR /app
COPY . .
RUN pip install .
CMD ["python3", "preprocess.py"]
Skeleton for our Flow
flow.py
from prefect import Flow, task
...

@task(name="Get data")
def get_data_from_s3():
    """Retrieve csv from S3 and saves as /data/raw/file.csv"""
    pass


@task(name="Process data")
def process_through_docker_image():
    """Start docker container which
        - reads the csv-file
        - processes the data
        - writes as json
    """
    pass

@task(name="Push data")
def upload_to_s3():
    """Push data to S3"""
    pass

with Flow("foo") as f:
    # First get data: get_data_from_s3()
    ...

    # Then process the data: process_through_docker_image()
    ...

    # Finally, upload the processed data to S3: upload_to_s3()
    ...

f.register()
Questions: 1. Is Prefect intended to be used this way? 2. Is it possible to use an already built Docker image in the way described above? 3. If so, how to do it? 😬
k

Kevin Kho

09/30/2021, 4:49 PM
Hey @VildMedPap, I am pretty sure Prefect can handle this. The first thing is that there are two ways to run a Docker container. The first is using the Docker RunConfiguration. This runs the entire flow on the container. The second, which is what I think you want, is to use the Prefect Docker Tasks like CreateContainer, StartContainer, WaitForContainer, StopContainer. These let you spin up containers. The next part of your question is how to mount a volume I think. Will post below how to do that
It will be like this:
container = CreateContainer(
    image_name="prefecthq/prefect",
    command='ls /home',
    volumes=['/Users/ruslantau/PycharmProjects/prefect/data:/home/data'],
    host_config={"binds": ['/Users/ruslantau/PycharmProjects/prefect/data:/home/data']},
)
start = StartContainer()
logs = GetContainerLogs(trigger=always_run)
status_code = WaitOnContainer()
remove_container = RemoveContainer(trigger=always_run)

with Flow(name="Volumes issue") as flow:
    image = pull_image()
    start_container = start(container_id=container)
    code = status_code(container_id=container, upstream_tasks=[start_container])
    collect_logs = logs(container_id=container, upstream_tasks=[code])
    logs = log([collect_logs])
    removed = remove_container(container_id=container, upstream_tasks=[logs])
Note you need the drive in two places. In
volumes
and in
host_config
But I think you can get them programatically after you get the data to s3 and then pass them to
CreateContainer
v

VildMedPap

10/01/2021, 2:42 PM
Hi Kevin. Thank you so much for your pointers. We made flows working with both cases: - Using local agent with docker image - Using docker agent with flow running on a docker container For purely visual purposes, do you know of any methods to “squash” the Docker Tasks into one task? So instead of seeing “CreateContainer”, “StartContainer”, etc. in the UI schematic, you’ll only see like “Processing container”?
k

Kevin Kho

10/01/2021, 2:43 PM
You can use a task in another task by calling the
.run()
method
@task
def another_task():
    CreateContainer.run()
v

VildMedPap

10/05/2021, 6:20 AM
Hi Kevin. Your help is greatly appreciated! We got it to squeeze the Docker tasks into one through the
run
method. We declared the internal dependencies of the Docker tasks through the
upstream_tasks
argument. This isn’t available in the
run
method. How would we go about setting the
upsteam_tasks
when we activate the rask with the
run
method.
k

Kevin Kho

10/05/2021, 1:46 PM
You can use
upstream_tasks
in the
.run()
inside the
Flow
object. I think Prefect does something to handle it.
v

VildMedPap

10/05/2021, 4:53 PM
Yeah but it’s inside a task I want to set the upstream tasks between the “subtasks” (Docker Tasks)
k

Kevin Kho

10/05/2021, 4:57 PM
I see what you mean.
upstream_task
is for constructing the DAG on the flow level. I think execution will be sequential inside a task though. Is it not?