VildMedPap
09/30/2021, 1:30 PMdocker build my_processor .
docker run -v ${PWD}/data:/app/data my_processor
Here the raw data (the csv file) is located in /data/raw and the processed file (the json file) will be located by the container in /data/processed.
What we wish to achieve
- 3 tasks
- Task #1) Fetch data from source (S3) and temporarily store it in /data/raw
- Task #2) Run the container with bind mount to /data directory
- Task #3) Push data from /data/processed to destination (S3)
Our setup
- The prefect server is hosted locally on an EC2 server
Dockerfile used to build the image
FROM python:3.7-slim
WORKDIR /app
COPY . .
RUN pip install .
CMD ["python3", "preprocess.py"]
Skeleton for our Flow flow.py
from prefect import Flow, task
...
@task(name="Get data")
def get_data_from_s3():
"""Retrieve csv from S3 and saves as /data/raw/file.csv"""
pass
@task(name="Process data")
def process_through_docker_image():
"""Start docker container which
- reads the csv-file
- processes the data
- writes as json
"""
pass
@task(name="Push data")
def upload_to_s3():
"""Push data to S3"""
pass
with Flow("foo") as f:
# First get data: get_data_from_s3()
...
# Then process the data: process_through_docker_image()
...
# Finally, upload the processed data to S3: upload_to_s3()
...
f.register()
Questions:
1. Is Prefect intended to be used this way?
2. Is it possible to use an already built Docker image in the way described above?
3. If so, how to do it? đŹKevin Kho
09/30/2021, 4:49 PMcontainer = CreateContainer(
image_name="prefecthq/prefect",
command='ls /home',
volumes=['/Users/ruslantau/PycharmProjects/prefect/data:/home/data'],
host_config={"binds": ['/Users/ruslantau/PycharmProjects/prefect/data:/home/data']},
)
start = StartContainer()
logs = GetContainerLogs(trigger=always_run)
status_code = WaitOnContainer()
remove_container = RemoveContainer(trigger=always_run)
with Flow(name="Volumes issue") as flow:
image = pull_image()
start_container = start(container_id=container)
code = status_code(container_id=container, upstream_tasks=[start_container])
collect_logs = logs(container_id=container, upstream_tasks=[code])
logs = log([collect_logs])
removed = remove_container(container_id=container, upstream_tasks=[logs])
volumes
and in host_config
CreateContainer
VildMedPap
10/01/2021, 2:42 PMKevin Kho
10/01/2021, 2:43 PM.run()
method@task
def another_task():
CreateContainer.run()
VildMedPap
10/05/2021, 6:20 AMrun
method. We declared the internal dependencies of the Docker tasks through the upstream_tasks
argument. This isnât available in the run
method. How would we go about setting the upsteam_tasks
when we activate the rask with the run
method.Kevin Kho
10/05/2021, 1:46 PMupstream_tasks
in the .run()
inside the Flow
object. I think Prefect does something to handle it.VildMedPap
10/05/2021, 4:53 PMKevin Kho
10/05/2021, 4:57 PMupstream_task
is for constructing the DAG on the flow level. I think execution will be sequential inside a task though. Is it not?