Tony Yun
09/29/2021, 3:57 PMKevin Kho
docker pull ecr:image_address and as long as you’re authenticated it should work…or are you talking about for Prefect specifically? aws docsTony Yun
09/29/2021, 4:00 PMclass prefect.tasks.docker.images.PullImage(repository=None, tag=None, docker_server_url="unix:///var/run/docker.sock", extra_docker_kwargs=None, stream_logs=False, **kwargs)Tony Yun
09/29/2021, 4:01 PMKevin Kho
docker pull under the hoodTony Yun
09/29/2021, 4:02 PMTony Yun
09/29/2021, 4:07 PM@task(**task_configs)
def run():
prefect.tasks.docker.images.PullImage.run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)
with Flow(
"DBT Runs",
# schedule=schedule,
storage=storage,
run_config=KubernetesRun()
) as flow:
run()
File "flow.py", line 41, in run
repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
TypeError: method() missing 1 required positional argument: 'self'
Is there an example how this should be run?Tony Yun
09/29/2021, 4:08 PMTony Yun
09/29/2021, 4:12 PMKevin Kho
@task(**task_configs)
def run():
prefect.tasks.docker.images.PullImage().run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)
might work but really I think you should doTony Yun
09/29/2021, 4:13 PMKevin Kho
run = prefect.tasks.docker.images.PullImage(**task_configs)
with Flow(...):
run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)Tony Yun
09/29/2021, 4:16 PMTony Yun
09/29/2021, 4:16 PM[2021-09-29 12:15:32-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:15:32-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:15:33-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:15:33-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededKevin Kho
Tony Yun
09/29/2021, 4:17 PMKevin Kho
Tony Yun
09/29/2021, 4:17 PMKevin Kho
docker image rm maybe?Tony Yun
09/29/2021, 4:18 PMKevin Kho
Tony Yun
09/29/2021, 4:21 PMTony Yun
09/29/2021, 4:22 PMdocker image rm <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>
and then python flow.pyKevin Kho
Kevin Kho
stream_logs=True for the Docker task/Tony Yun
09/29/2021, 4:23 PMTony Yun
09/29/2021, 4:23 PMKevin Kho
export PREFECT___LOGGING___LEVEL=DEBUGTony Yun
09/29/2021, 4:25 PMwith Flow(
"DBT Runs",
# schedule=schedule,
storage=storage,
run_config=KubernetesRun()
) as flow:
run(
repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest',
stream_logs=True
)
ok let me add that exportTony Yun
09/29/2021, 4:25 PM[2021-09-29 12:25:16-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Scheduled to Running
[2021-09-29 12:25:16-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Pending to Running
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Calling task.run() method...
[2021-09-29 12:25:16-0400] DEBUG - prefect.PullImage | Pulling image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulling from test_dbt
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Digest: sha256:41e235e3ab6029d2edfc0acece52205232061de04a24616166aa4fb343ba3f0d
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Status: Image is up to date for <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulled image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Running to Success
[2021-09-29 12:25:17-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:25:17-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-09-29 12:25:17-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Running to Success
it seems it just pull the image, but not running itKevin Kho
CreateContainer and StartContainer tasksTony Yun
09/29/2021, 4:27 PMKevin Kho
StopContainer and RemoveContainerTony Yun
09/29/2021, 4:30 PMKevin Kho
Tony Yun
09/29/2021, 4:33 PMThis is an adaptation of the Docker Pipeline example where theis it still available somewhere else?image is pulled and a container is started using that image to run another Flow inside that container.prefecthq/prefect:latest
Kevin Kho
Tony Yun
09/29/2021, 4:36 PMTony Yun
09/29/2021, 7:27 PMKevin Kho
Tony Yun
09/29/2021, 7:32 PMTony Yun
09/29/2021, 7:34 PMdocker_server_url="<tcp://localhost:2375>" option(given by your example), it will fail for this error:
docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.41/containers/create>: Bad Request ("invalid reference format")
2. If I give that option, it will fail for this error:
docker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='localhost', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x10837c1d0>: Failed to establish a new connection: [Errno 61] Connection refused'))Kevin Kho
Tony Yun
09/29/2021, 8:40 PMimage = PullImage(
repository=ECR_REPO,
tag="latest",
)
create_container = CreateContainer(
docker_server_url="<tcp://localhost:2375>",
image_name=ECR_REPO,
)
start_container = StartContainer(docker_server_url="<tcp://localhost:2375>")
wait_on_container = WaitOnContainer(docker_server_url="<tcp://localhost:2375>")
with Flow(
"DBT Runs",
storage=storage,
run_config=KubernetesRun()
) as flow:
container_id = create_container(image)
started = start_container(container_id=container_id)
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])Kevin Kho
flow.run()?Tony Yun
09/29/2021, 9:04 PMKevin Kho
Kevin Kho
<tcp://0.0.0.0:2375> and tcp://*127.0.0.1:2375* for the server URL?Tony Yun
09/29/2021, 10:33 PMTony Yun
09/29/2021, 10:33 PMTony Yun
09/29/2021, 10:34 PMTony Yun
09/29/2021, 10:37 PMdocker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='0.0.0.0', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x107c78190>: Failed to establish a new connection: [Errno 61] Connection refused'))
127.0.0.1 and localhost both failed for the same reasonKevin Kho
Kevin Kho
docker create gives you the same issue?Tony Yun
09/29/2021, 11:05 PM$ docker create test_dbt:latest ‹ruby-2.6.4›
c5f89809c7a31e5a0da371acc9d571381b7b29d8540bd0be6ab4f5b198b954f7
(criteo)Tony Yun
09/29/2021, 11:05 PMKevin Kho
Kevin Kho
from prefect import Flow, task
from prefect.tasks.docker.containers import CreateContainer, StartContainer, WaitOnContainer
create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()
with Flow("test") as flow:
id = create("prefecthq/prefect")
a = start(id)
wait(id, upstream_tasks=[a])
flow.run()Kevin Kho
Kevin Kho
Tony Yun
09/29/2021, 11:14 PMdocker.errors.ImageNotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/create>: Not Found ("No such image: prefecthq/prefect:latest")Tony Yun
09/29/2021, 11:17 PMKevin Kho
Tony Yun
09/30/2021, 2:41 PMGetContainerLogs but no effect.
create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()
logs = GetContainerLogs()
with Flow("test") as flow:
id = create("prefecthq/prefect")
a = start(id)
status_code = wait(id, upstream_tasks=[a])
logs(id, upstream_tasks=[status_code])Tony Yun
09/30/2021, 2:45 PMKevin Kho
GetContainerLogs literally just pulls the logs but doesn’t pass them automatically. If you want them streamed passed, I think you need to pass them to another task that logs them, but this will be all at once. It won’t be streamed. The Docker tasks don’t seem to have any streaming functionality.Tony Yun
09/30/2021, 2:48 PMKevin Kho
Tony Yun
09/30/2021, 2:54 PMTony Yun
09/30/2021, 2:54 PMKevin Kho
@task
def mytask(container):
for line in container.logs(stream=True):
<http://logger.info|logger.info>(line.strip())
Taken from hereTony Yun
09/30/2021, 3:00 PMContainers
*`attach`*(container, stdout=True, stderr=True, stream=False, logs=False, demux=False)
Attach to a container.
Thebut not sure how to do attach in prefectfunction is a wrapper around this method, which you can use instead if you want to fetch/stream container output without first retrieving the entire backlog..logs()
Tony Yun
09/30/2021, 3:00 PMKevin Kho
Tony Yun
09/30/2021, 3:31 PMTony Yun
09/30/2021, 3:31 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 4:03 PMTony Yun
09/30/2021, 4:03 PMKevin Kho
Tony Yun
09/30/2021, 4:07 PMstorage = Docker(
base_image="prefecthq/prefect:0.14.9-python3.7")
exception:
File "/usr/local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directoryKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 4:14 PMTony Yun
09/30/2021, 4:17 PMKevin Kho
Tony Yun
09/30/2021, 4:19 PMTony Yun
09/30/2021, 4:30 PMDocker Tasks
Collection of tasks for orchestrating Docker images and containers.
_Note_: If running these tasks from inside of a docker container itself there are some extra requirements needed for that to work. The container needs to be able to talk to a Docker server. There are a few ways to accomplish this:
1. Use a base image that has Docker installed and running (e.g. https://hub.docker.com/_/docker)
2. Installing the Docker CLI in the base image
Tony Yun
09/30/2021, 4:30 PMprefecthq/prefect:0.14.9-python3.7 does not have Docker CLI installedKevin Kho
Tony Yun
09/30/2021, 4:52 PMprefecthq have a such available image to use? or I have to build one myselfKevin Kho
Tony Yun
09/30/2021, 5:59 PMTony Yun
09/30/2021, 6:04 PMKevin Kho
Tony Yun
09/30/2021, 6:06 PMTony Yun
09/30/2021, 6:06 PMKevin Kho
RunNamespacedJob instead? Yeah Docker in Docker gets messyTony Yun
09/30/2021, 6:08 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 6:10 PMTony Yun
09/30/2021, 6:11 PMTony Yun
09/30/2021, 6:17 PMstorage = Docker(
registry_url="<http://gcr.io/dev/|gcr.io/dev/>", image_name="docker-on-k8s-flow", image_tag="0.1.0"
)Tony Yun
09/30/2021, 6:18 PMKevin Kho
Tony Yun
09/30/2021, 6:19 PMTony Yun
09/30/2021, 6:19 PMTony Yun
09/30/2021, 6:21 PMTask 'PullImage': Starting task run...
14:20:34
ERROR
CloudTaskRunner
Unexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
Traceback (most recent call last):Tony Yun
09/30/2021, 6:21 PMdocker-on-k8s-flowTony Yun
09/30/2021, 6:21 PMKevin Kho
Tony Yun
09/30/2021, 6:24 PMTony Yun
09/30/2021, 6:24 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 6:26 PMTony Yun
09/30/2021, 6:28 PMwith Flow(
"DBT flow",
storage=storage,
run_config=KubernetesRun(job_template=job_template)
) as flow:
pullImage(repository=ECR_REPO, tag="latest")
container_id = create_container(ECR_REPO)
started = start_container(container_id=container_id)
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
logs = get_logs(container_id, upstream_tasks=[status_code])
printLogs(logs)
flow.run_config = KubernetesRun(job_template=job_template)
flow.storage = Docker(
base_image="prefecthq/prefect:0.14.9-python3.7",
registry_url="<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/|492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/>",
)Tony Yun
09/30/2021, 6:29 PMKevin Kho
Kevin Kho
flow.run() because the KubernetesRun is the one handling that Docker in Docker setup and flow.run() not does use RunConfig. You also need the whole job template in the example.Tony Yun
09/30/2021, 6:31 PMTony Yun
09/30/2021, 6:31 PM