Tony Yun
09/29/2021, 3:57 PMKevin Kho
docker pull ecr:image_address
and as long as you’re authenticated it should work…or are you talking about for Prefect specifically? aws docsTony Yun
09/29/2021, 4:00 PMclass prefect.tasks.docker.images.PullImage(repository=None, tag=None, docker_server_url="unix:///var/run/docker.sock", extra_docker_kwargs=None, stream_logs=False, **kwargs)
Tony Yun
09/29/2021, 4:01 PMKevin Kho
docker pull
under the hoodTony Yun
09/29/2021, 4:02 PMTony Yun
09/29/2021, 4:07 PM@task(**task_configs)
def run():
prefect.tasks.docker.images.PullImage.run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)
with Flow(
"DBT Runs",
# schedule=schedule,
storage=storage,
run_config=KubernetesRun()
) as flow:
run()
File "flow.py", line 41, in run
repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
TypeError: method() missing 1 required positional argument: 'self'
Is there an example how this should be run?Tony Yun
09/29/2021, 4:08 PMTony Yun
09/29/2021, 4:12 PMKevin Kho
@task(**task_configs)
def run():
prefect.tasks.docker.images.PullImage().run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)
might work but really I think you should doTony Yun
09/29/2021, 4:13 PMKevin Kho
run = prefect.tasks.docker.images.PullImage(**task_configs)
with Flow(...):
run(
repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
)
Tony Yun
09/29/2021, 4:16 PMTony Yun
09/29/2021, 4:16 PM[2021-09-29 12:15:32-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:15:32-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:15:33-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:15:33-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Kevin Kho
Tony Yun
09/29/2021, 4:17 PMKevin Kho
Tony Yun
09/29/2021, 4:17 PMKevin Kho
docker image rm
maybe?Tony Yun
09/29/2021, 4:18 PMKevin Kho
Tony Yun
09/29/2021, 4:21 PMTony Yun
09/29/2021, 4:22 PMdocker image rm <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>
and then python flow.py
Kevin Kho
Kevin Kho
stream_logs=True
for the Docker task/Tony Yun
09/29/2021, 4:23 PMTony Yun
09/29/2021, 4:23 PMKevin Kho
export PREFECT___LOGGING___LEVEL=DEBUG
Tony Yun
09/29/2021, 4:25 PMwith Flow(
"DBT Runs",
# schedule=schedule,
storage=storage,
run_config=KubernetesRun()
) as flow:
run(
repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest',
stream_logs=True
)
ok let me add that exportTony Yun
09/29/2021, 4:25 PM[2021-09-29 12:25:16-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Scheduled to Running
[2021-09-29 12:25:16-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Pending to Running
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Calling task.run() method...
[2021-09-29 12:25:16-0400] DEBUG - prefect.PullImage | Pulling image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulling from test_dbt
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Digest: sha256:41e235e3ab6029d2edfc0acece52205232061de04a24616166aa4fb343ba3f0d
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Status: Image is up to date for <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulled image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Running to Success
[2021-09-29 12:25:17-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:25:17-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-09-29 12:25:17-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Running to Success
it seems it just pull the image, but not running itKevin Kho
CreateContainer
and StartContainer
tasksTony Yun
09/29/2021, 4:27 PMKevin Kho
StopContainer
and RemoveContainer
Tony Yun
09/29/2021, 4:30 PMKevin Kho
Tony Yun
09/29/2021, 4:33 PMThis is an adaptation of the Docker Pipeline example where theis it still available somewhere else?image is pulled and a container is started using that image to run another Flow inside that container.prefecthq/prefect:latest
Kevin Kho
Tony Yun
09/29/2021, 4:36 PMTony Yun
09/29/2021, 7:27 PMKevin Kho
Tony Yun
09/29/2021, 7:32 PMTony Yun
09/29/2021, 7:34 PMdocker_server_url="<tcp://localhost:2375>"
option(given by your example), it will fail for this error:
docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.41/containers/create>: Bad Request ("invalid reference format")
2. If I give that option, it will fail for this error:
docker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='localhost', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x10837c1d0>: Failed to establish a new connection: [Errno 61] Connection refused'))
Kevin Kho
Tony Yun
09/29/2021, 8:40 PMimage = PullImage(
repository=ECR_REPO,
tag="latest",
)
create_container = CreateContainer(
docker_server_url="<tcp://localhost:2375>",
image_name=ECR_REPO,
)
start_container = StartContainer(docker_server_url="<tcp://localhost:2375>")
wait_on_container = WaitOnContainer(docker_server_url="<tcp://localhost:2375>")
with Flow(
"DBT Runs",
storage=storage,
run_config=KubernetesRun()
) as flow:
container_id = create_container(image)
started = start_container(container_id=container_id)
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
Kevin Kho
flow.run()
?Tony Yun
09/29/2021, 9:04 PMKevin Kho
Kevin Kho
<tcp://0.0.0.0:2375>
and tcp://*127.0.0.1:2375*
for the server URL?Tony Yun
09/29/2021, 10:33 PMTony Yun
09/29/2021, 10:33 PMTony Yun
09/29/2021, 10:34 PMTony Yun
09/29/2021, 10:37 PMdocker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='0.0.0.0', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x107c78190>: Failed to establish a new connection: [Errno 61] Connection refused'))
127.0.0.1 and localhost both failed for the same reasonKevin Kho
Kevin Kho
docker create
gives you the same issue?Tony Yun
09/29/2021, 11:05 PM$ docker create test_dbt:latest ‹ruby-2.6.4›
c5f89809c7a31e5a0da371acc9d571381b7b29d8540bd0be6ab4f5b198b954f7
(criteo)
Tony Yun
09/29/2021, 11:05 PMKevin Kho
Kevin Kho
from prefect import Flow, task
from prefect.tasks.docker.containers import CreateContainer, StartContainer, WaitOnContainer
create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()
with Flow("test") as flow:
id = create("prefecthq/prefect")
a = start(id)
wait(id, upstream_tasks=[a])
flow.run()
Kevin Kho
Kevin Kho
Tony Yun
09/29/2021, 11:14 PMdocker.errors.ImageNotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/create>: Not Found ("No such image: prefecthq/prefect:latest")
Tony Yun
09/29/2021, 11:17 PMKevin Kho
Tony Yun
09/30/2021, 2:41 PMGetContainerLogs
but no effect.
create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()
logs = GetContainerLogs()
with Flow("test") as flow:
id = create("prefecthq/prefect")
a = start(id)
status_code = wait(id, upstream_tasks=[a])
logs(id, upstream_tasks=[status_code])
Tony Yun
09/30/2021, 2:45 PMKevin Kho
GetContainerLogs
literally just pulls the logs but doesn’t pass them automatically. If you want them streamed passed, I think you need to pass them to another task that logs them, but this will be all at once. It won’t be streamed. The Docker tasks don’t seem to have any streaming functionality.Tony Yun
09/30/2021, 2:48 PMKevin Kho
Tony Yun
09/30/2021, 2:54 PMTony Yun
09/30/2021, 2:54 PMKevin Kho
@task
def mytask(container):
for line in container.logs(stream=True):
<http://logger.info|logger.info>(line.strip())
Taken from hereTony Yun
09/30/2021, 3:00 PMContainers
*`attach`*(container, stdout=True, stderr=True, stream=False, logs=False, demux=False)
Attach to a container.
Thebut not sure how to do attach in prefectfunction is a wrapper around this method, which you can use instead if you want to fetch/stream container output without first retrieving the entire backlog..logs()
Tony Yun
09/30/2021, 3:00 PMKevin Kho
Tony Yun
09/30/2021, 3:31 PMTony Yun
09/30/2021, 3:31 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 4:03 PMTony Yun
09/30/2021, 4:03 PMKevin Kho
Tony Yun
09/30/2021, 4:07 PMstorage = Docker(
base_image="prefecthq/prefect:0.14.9-python3.7")
exception:
File "/usr/local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory
Kevin Kho
Kevin Kho
Tony Yun
09/30/2021, 4:14 PMTony Yun
09/30/2021, 4:17 PMKevin Kho
Tony Yun
09/30/2021, 4:19 PMTony Yun
09/30/2021, 4:30 PMDocker Tasks
Collection of tasks for orchestrating Docker images and containers.
_Note_: If running these tasks from inside of a docker container itself there are some extra requirements needed for that to work. The container needs to be able to talk to a Docker server. There are a few ways to accomplish this:
1. Use a base image that has Docker installed and running (e.g. https://hub.docker.com/_/docker)
2. Installing the Docker CLI in the base image
Tony Yun
09/30/2021, 4:30 PMprefecthq/prefect:0.14.9-python3.7
does not have Docker CLI installedKevin Kho
Tony Yun
09/30/2021, 4:52 PMprefecthq
have a such available image to use? or I have to build one myselfKevin Kho
Tony Yun
09/30/2021, 5:59 PMTony Yun
09/30/2021, 6:04 PMKevin Kho
Tony Yun
09/30/2021, 6:06 PMTony Yun
09/30/2021, 6:06 PMKevin Kho
RunNamespacedJob
instead? Yeah Docker in Docker gets messyTony Yun
09/30/2021, 6:08 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 6:10 PMTony Yun
09/30/2021, 6:11 PMTony Yun
09/30/2021, 6:17 PMstorage = Docker(
registry_url="<http://gcr.io/dev/|gcr.io/dev/>", image_name="docker-on-k8s-flow", image_tag="0.1.0"
)
Tony Yun
09/30/2021, 6:18 PMKevin Kho
Tony Yun
09/30/2021, 6:19 PMTony Yun
09/30/2021, 6:19 PMTony Yun
09/30/2021, 6:21 PMTask 'PullImage': Starting task run...
14:20:34
ERROR
CloudTaskRunner
Unexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
Traceback (most recent call last):
Tony Yun
09/30/2021, 6:21 PMdocker-on-k8s-flow
Tony Yun
09/30/2021, 6:21 PMKevin Kho
Tony Yun
09/30/2021, 6:24 PMTony Yun
09/30/2021, 6:24 PMKevin Kho
Kevin Kho
Tony Yun
09/30/2021, 6:26 PMTony Yun
09/30/2021, 6:28 PMwith Flow(
"DBT flow",
storage=storage,
run_config=KubernetesRun(job_template=job_template)
) as flow:
pullImage(repository=ECR_REPO, tag="latest")
container_id = create_container(ECR_REPO)
started = start_container(container_id=container_id)
status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
logs = get_logs(container_id, upstream_tasks=[status_code])
printLogs(logs)
flow.run_config = KubernetesRun(job_template=job_template)
flow.storage = Docker(
base_image="prefecthq/prefect:0.14.9-python3.7",
registry_url="<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/|492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/>",
)
Tony Yun
09/30/2021, 6:29 PMKevin Kho
Kevin Kho
flow.run()
because the KubernetesRun is the one handling that Docker in Docker setup and flow.run()
not does use RunConfig. You also need the whole job template in the example.Tony Yun
09/30/2021, 6:31 PMTony Yun
09/30/2021, 6:31 PM