Hi, is there a way to pull an image from AWS ECR and run the image? the only one I found is to pull ...
t
Hi, is there a way to pull an image from AWS ECR and run the image? the only one I found is to pull from Docker server which doesn’t apply to my case.
k
I believe you can still do
docker pull ecr:image_address
and as long as you’re authenticated it should work…or are you talking about for Prefect specifically? aws docs
t
yes docker pull will work. but will it work for this function?
Copy code
class prefect.tasks.docker.images.PullImage(repository=None, tag=None, docker_server_url="unix:///var/run/docker.sock", extra_docker_kwargs=None, stream_logs=False, **kwargs)
I don’t know what server_url I should provide to specify it as ECR. Or maybe let me try put the entire REPO string in repository
k
Ahh I see what you mean. I believe it should if you put the ECR repository there because it just uses
docker pull
under the hood
t
got it. thanks!
hmm, I defined a very simple task but failed for a exception:
Copy code
@task(**task_configs)
def run():
    prefect.tasks.docker.images.PullImage.run(
        repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
    )


with Flow(
    "DBT Runs",
    # schedule=schedule,
    storage=storage,
    run_config=KubernetesRun()
) as flow:
    run()
Copy code
File "flow.py", line 41, in run
    repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
TypeError: method() missing 1 required positional argument: 'self'
Is there an example how this should be run?
oh I guess I have to initiate the class first
hmm doesn’t work out. Need one example of command..
k
Copy code
@task(**task_configs)
def run():
    prefect.tasks.docker.images.PullImage().run(
        repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
    )
might work but really I think you should do
t
yep let me try
k
Copy code
run = prefect.tasks.docker.images.PullImage(**task_configs)

with Flow(...):
     run(
        repository='<http://49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt|49246627xxx.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest'
    )
t
hmm, both ways finished instantly. I wonder if it’s executed at all..
Copy code
[2021-09-29 12:15:32-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:15:32-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:15:33-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:15:33-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
You may have the image cached so it didn’t pull
t
oh, how to clear the cache?
k
Try deleting that image from your local then doing it again?
t
ok
k
docker image rm
maybe?
t
the local image is actually healthy
k
Maybe your pull succeeded? You wanna remove it anyway just to test if PullImage works right? I expect stuff to be working though
t
yep just removed and try run, didn’t work
I ran:
Copy code
docker image rm <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>
and then
python flow.py
k
What was your error? or the Task showed success?
Can you try
stream_logs=True
for the Docker task/
t
no errors, it finishes instantly, with no docker logs
let me try that
k
I think the Docker logs are DEBUG level so you might also need
export PREFECT___LOGGING___LEVEL=DEBUG
t
added, still the same:
Copy code
with Flow(
    "DBT Runs",
    # schedule=schedule,
    storage=storage,
    run_config=KubernetesRun()
) as flow:
    run(
        repository='<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt>', tag='latest',
        stream_logs=True
    )
ok let me add that export
Copy code
[2021-09-29 12:25:16-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'DBT Runs'
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-09-29 12:25:16-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Scheduled to Running
[2021-09-29 12:25:16-0400] INFO - prefect.TaskRunner | Task 'PullImage': Starting task run...
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Pending to Running
[2021-09-29 12:25:16-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Calling task.run() method...
[2021-09-29 12:25:16-0400] DEBUG - prefect.PullImage | Pulling image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulling from test_dbt
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Digest: sha256:41e235e3ab6029d2edfc0acece52205232061de04a24616166aa4fb343ba3f0d
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Status: Image is up to date for <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.PullImage | Pulled image <http://492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest|492466275335.dkr.ecr.us-east-1.amazonaws.com/test_dbt:latest>
[2021-09-29 12:25:17-0400] DEBUG - prefect.TaskRunner | Task 'PullImage': Handling state change from Running to Success
[2021-09-29 12:25:17-0400] INFO - prefect.TaskRunner | Task 'PullImage': Finished task run for task with final state: 'Success'
[2021-09-29 12:25:17-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-09-29 12:25:17-0400] DEBUG - prefect.FlowRunner | Flow 'DBT Runs': Handling state change from Running to Success
it seems it just pull the image, but not running it
k
Oh if you want to run it, there are
CreateContainer
and
StartContainer
tasks
t
oh okay
k
and
StopContainer
and
RemoveContainer
t
I’m a little confused on the best practice of using these commands together.. If there is example that will be great! otherwise, I’ll try it out this afternoon.
k
t
thanks! one thing, the first sentence has a link but is 404:
This is an adaptation of the Docker Pipeline example where the 
prefecthq/prefect:latest
 image is pulled and a container is started using that image to run another Flow inside that container.
is it still available somewhere else?
k
I haven’t seen that page myself ever so i think not
t
Hmm, feel that is a better example page. Alright, let me try something I have first
Hi Kevin, I’m really getting confused on following that instruction in the link that you shared. Have sometime today to go over it together?
k
Can make an example in a bit.
t
thank you!
just noting down my problem when following that instruction for record: 1. if I don’t give
docker_server_url="<tcp://localhost:2375>"
option(given by your example), it will fail for this error:
Copy code
docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.41/containers/create>: Bad Request ("invalid reference format")
2. If I give that option, it will fail for this error:
Copy code
docker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='localhost', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x10837c1d0>: Failed to establish a new connection: [Errno 61] Connection refused'))
k
This is the Pull right?
t
running three commands:
Copy code
image = PullImage(
    repository=ECR_REPO,
    tag="latest",
)

create_container = CreateContainer(
    docker_server_url="<tcp://localhost:2375>",
    image_name=ECR_REPO,
)
start_container = StartContainer(docker_server_url="<tcp://localhost:2375>")
wait_on_container = WaitOnContainer(docker_server_url="<tcp://localhost:2375>")

with Flow(
    "DBT Runs",
    storage=storage,
    run_config=KubernetesRun()
) as flow:
    container_id = create_container(image)
    started = start_container(container_id=container_id)
    status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
k
You didn’t bring this to Kubernetes yet when you got the error right? That was just
flow.run()
?
t
That’s right
k
Ok so I can’t replicate cuz I have a Mac and the default server url just works for me. Just checking though, have you tried
<tcp://0.0.0.0:2375>
and
tcp://*127.0.0.1:2375*
for the server URL?
t
I’m using Mac too
Let me try
so my logics look right?
0.0.0.0 failed for:
Copy code
docker.errors.DockerException: Error while fetching server API version: HTTPConnectionPool(host='0.0.0.0', port=2375): Max retries exceeded with url: /version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x107c78190>: Failed to establish a new connection: [Errno 61] Connection refused'))
127.0.0.1 and localhost both failed for the same reason
k
The logic looks right for sure. It’s more about the connection to the docker server
👌 1
Can you try seeing if using the CLI with
docker create
gives you the same issue?
t
that works:
Copy code
$ docker create test_dbt:latest                                                                                                                                                                    ‹ruby-2.6.4›
c5f89809c7a31e5a0da371acc9d571381b7b29d8540bd0be6ab4f5b198b954f7
(criteo)
is the ip address and port going to be the same when deployed to Cloud?
k
that depends on the execution environment. it would use the docker installation of that machine
👌 1
Copy code
from prefect import Flow, task
from prefect.tasks.docker.containers import CreateContainer, StartContainer, WaitOnContainer

create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()

with Flow("test") as flow:
    id = create("prefecthq/prefect")
    a = start(id)
    wait(id, upstream_tasks=[a])

flow.run()
this is my working code. pretty similar to yours
and I get the ls output
t
hmm, I ran yours failed for a different reason:
Copy code
docker.errors.ImageNotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/create>: Not Found ("No such image: prefecthq/prefect:latest")
it’s been late today. I can try it again tmrw. Thanks for your help!
k
Oh this assumes you already pulled the image, you need to pull that first
t
it worked in your example now! but I can’t seem to pass the logs to Prefect. Is it possible? I used
GetContainerLogs
but no effect.
Copy code
create = CreateContainer(command=["ls"])
start = StartContainer()
wait = WaitOnContainer()
logs = GetContainerLogs()

with Flow("test") as flow:
    id = create("prefecthq/prefect")
    a = start(id)
    status_code = wait(id, upstream_tasks=[a])
    logs(id, upstream_tasks=[status_code])
works in my example now too! cheers
k
So I think that
GetContainerLogs
literally just pulls the logs but doesn’t pass them automatically. If you want them streamed passed, I think you need to pass them to another task that logs them, but this will be all at once. It won’t be streamed. The Docker tasks don’t seem to have any streaming functionality.
t
Ugh, so the Prefect Cloud would not see any progress inside the container then?
k
No to the first one. I don’t know if the PyDocker API supports streaming the logs out. On the second one, I don’t think you can do that. Do you know of a way with pure docker commands?
t
actually I found a way to pass envrionment vars: create = CreateContainer(command=[“ls”], environment={‘DBT_PASSWORD’:‘xxx’})
so I’ll still need to find a way to stream docker logs. or at least give the logs output to Prefect
k
Ah that might be hard. I think passing them one by one is doable but streaming in real time might be hard. Maybe you can try making a task:
Copy code
@task
def mytask(container):
    for line in container.logs(stream=True):
        <http://logger.info|logger.info>(line.strip())
Taken from here
t
I think it’s doable by attaching to the container:
Containers
*`attach`*(containerstdout=Truestderr=Truestream=Falselogs=Falsedemux=False)
Attach to a container.
The 
.logs()
 function is a wrapper around this method, which you can use instead if you want to fetch/stream container output without first retrieving the entire backlog.
but not sure how to do attach in prefect
I know how to do in command line
k
Will look in a bit
🙏 1
t
please ignore my previous deleted questions
solved it myself after typing it 😂
k
Was in a call. You got the attach? working? Would like to see how.
Or you still need help on that?
t
no, not attach. but i got print logs after container working
attach will not be a hurry now. but good to know if works
k
ok will look a bit
t
actually one important question just came right now. I can run it now fine locally. but registered to Cloud it failed for Docker not running error. I imagine because I used this Storage? Which prefect image I should use to enable the Docker daemon up-running.
Copy code
storage = Docker(
    base_image="prefecthq/prefect:0.14.9-python3.7")
exception:
Copy code
File "/usr/local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory
k
I believe that is an error with Docker not being able to locate the sock. I think this is a Docker in docker error? Are you running the docker agent in a docker container by chance?
Or is docker working on that machine in general? haha
t
I’m using KubenetesRun() in Cloud, it fetches images from ECR registry, and all of our flows run those Docker images well..
the other flows, we never used PullImage, StartContainer method though
k
Ah gotcha. This I need to look into, but in general the socket is not being found
t
yes, this usually seen on my local computer when docker is not started. Thanks!
In the documentation, it says this:
Docker Tasks
Collection of tasks for orchestrating Docker images and containers.
_Note_: If running these tasks from inside of a docker container itself there are some extra requirements needed for that to work. The container needs to be able to talk to a Docker server. There are a few ways to accomplish this:
1. Use a base image that has Docker installed and running (e.g. https://hub.docker.com/_/docker)
2. Installing the Docker CLI in the base image
so my problem is the Storage image I used
prefecthq/prefect:0.14.9-python3.7
does not have Docker CLI installed
k
Ahh I see
t
does
prefecthq
have a such available image to use? or I have to build one myself
k
You would likely need to build one yourself. I don’t think we have an image. I am not sure this will entirely solve your problem because I just don’t know if you can seamlessly start a container from inside a pod. But we’ll see. I guess if docker is running on the pod it might work
t
ok I’ll try it out
wow this is complex..
k
what did you find?
t
https://support.cloudbees.com/hc/en-us/articles/360028151031-Docker-outside-of-Docker-no-longer-works-in-EKS?page=11 it’s asking to do Docker in Docker EKS configurations, basically touching Kubernetes configurations
it’s not easy to install Docker inside an image
k
Ah ok maybe you need to use the
RunNamespacedJob
instead? Yeah Docker in Docker gets messy
t
let me check
Wait, this example is on kubernetes run though
t
oh!
let me fill the missing part
hmm doesn’t have access to that registry:
Copy code
storage = Docker(
    registry_url="<http://gcr.io/dev/|gcr.io/dev/>", image_name="docker-on-k8s-flow", image_tag="0.1.0"
)
cannot pull that image, cannot push to there either
k
I think that is just a placeholder for example.
t
oh
okay let me change
no… failed for the same reason:
Copy code
Task 'PullImage': Starting task run...
14:20:34
ERROR
CloudTaskRunner
Unexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
Traceback (most recent call last):
I think it requires to run a specific image like that one
docker-on-k8s-flow
as the base Storage image
k
Are you doing flow.run? I think that specific setup is meant to be deployed on k8s
t
yes
? then how do I deploy to Prefect and let it run in schedule
k
Flow.run() won’t respect the RunConfig so it wont spin up that second container that gives the docker in docker daemon
You really need to edit it and make a similar structure then register and run on k8s
t
sure, but will Prefect be used to schedule and run in the future?
i’m running this now:
Copy code
with Flow(
    "DBT flow",
    storage=storage,
    run_config=KubernetesRun(job_template=job_template)
) as flow:
    pullImage(repository=ECR_REPO, tag="latest")
    container_id = create_container(ECR_REPO)
    started = start_container(container_id=container_id)
    status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
    logs = get_logs(container_id, upstream_tasks=[status_code])
    printLogs(logs)


flow.run_config = KubernetesRun(job_template=job_template)
flow.storage = Docker(
    base_image="prefecthq/prefect:0.14.9-python3.7",
    registry_url="<http://492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/|492466275335.dkr.ecr.us-east-1.amazonaws.com/prefect/>",
)
there’s nothing in effect locally.
k
I am not seeing why it wouldn’t? The k8s job template there gets attached to the run config to Prefect will spin up the job with the two containers inside and one of those is the Docker inside the container so if you attach a schedule it should spin it up each time it runs the flow
There will be no effect locally with
flow.run()
because the KubernetesRun is the one handling that Docker in Docker setup and
flow.run()
not does use RunConfig. You also need the whole job template in the example.
t
looks I’m close, but struggling at the last mile
yes, I have the whole job template
738 Views