https://prefect.io logo
m

Mathijs Miermans

01/21/2022, 12:26 AM
Can the ECS Agent execute flows from a custom Docker image that we build? Our goal is to have our production and local development environments be as similar as possible. Locally we run flows in a custom Docker image with all of our Prefect code, that's based on the Prefect image. Currently we use S3 storage for production, and while we haven't run into any significant issues with that, we're prefer to make the environments more similar. I couldn't find the answer in the docs: • LocalStorage docs say it's only compatible with the Local Agent, so not with our ECS Agent. • Docker storage seems the closest to what we'd want, but I understand it would build a separate Docker image for each flow, instead of using a single Docker image.
k

Kevin Kho

01/21/2022, 12:47 AM
You can specify an image in the ECSRun RunConfiguration. So you can use S3 Storage + ECSRun to run the Flow on top of an image. You can also use Docker Storage and push that container to AWS ECR, than use that image for the ECSRun
Are you familiar with RunConfigurations?
m

Mathijs Miermans

01/21/2022, 3:18 AM
We're successfully using S3 Storage + ECSRun. I was wondering whether it's possible to use ECSRun without S3 Storage, if our custom Docker image contains the flows?
k

Kevin Kho

01/21/2022, 3:24 AM
Ah I didn’t understand. This is what you are looking for where you add multiple flows to the same container. If you want to entirely use your own image without this interface, you can use
stored_as_script=True
and then supply a
path
to the flow. During runtime, this will just go in and grab that file to run the Flow. You can see them in the doc here . When you do
flow.register()
, pass
build=False
so you don’t have to build the container
m

Mathijs Miermans

01/21/2022, 3:26 AM
Great! 🙇 I'll give that a try tomorrow.
👍 1
How does
Storage.flows
get set when a flow is executed on ECS? I tried both Docker storage and Local storage with our ECS Agent, and in both cases I get the error
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
. The only place where I see
self.flows
being set is in
add_flow()
, but I thought that function was only used during build/registration?
Docker storage:
Copy code
def create_docker_storage(flow_path: str) -> Storage:
    return Docker(
        stored_as_script=True,  # We store the flows in the Docker image
        path=flow_path,  # Direct path to the storage in the Docker container
    )
Local storage:
Copy code
def create_local_storage(flow_path: str) -> Storage:
    return Local(
        stored_as_script=True,  # We store the flows in the Docker image
        path=flow_path,  # Direct path to the storage in the Docker container
        add_default_labels=False,  # Don't label the flow with the local machine name
    )
k

Kevin Kho

01/26/2022, 4:57 PM
You can do Local storage plus DockerRun and the path will be relative to the container
If you use Docker storage but supply your own image, you will still need to add the flow to the storage.
m

Mathijs Miermans

01/26/2022, 5:02 PM
You can do Local storage plus DockerRun and the path will be relative to the container
In the screenshot you can see that we're passing in an absolute path. Looking at the source code, the first thing it does is check
self.flows
. Where is it using the
path
?
If you use Docker storage but supply your own image, you will still need to add the flow to the storage.
I believe I did that, but my next step was to try to run an agent locally with Docker storage to debug this.
I should add that the ECS task where the flow is executed is using the Prefect image with our flows included.
k

Kevin Kho

01/26/2022, 5:09 PM
Ah I see what you are saying. Still thinking about this. It looks good though. I think it would use the path here.
m

Mathijs Miermans

01/26/2022, 5:11 PM
Is it supposed to call
add_flow
before it calls
get_flow
on ECS? I'm confused how it's otherwise supposed to get passed this check:
Copy code
if flow_name not in self.flows:
    raise ValueError("Flow is not contained in this Storage")
k

Kevin Kho

01/26/2022, 5:11 PM
The Local Storage + ECSRun should work I think. I know it does for Local Storage + Docker.
For that, Anna has an example here that works
👀 1
Note you need
build=False
also
m

Mathijs Miermans

01/26/2022, 5:13 PM
I'm passing that in to the
register
call:
Copy code
flow.register(self.project_name, build=self.build)
(And
self.build
is set to False.)
🤞 I think I found my mistake, thanks to the example code you sent: I thought
add_flow()
was called automatically during
flow.register()
. When I call it during CD, I'm now getting a property under 'Flow Locations' that wasn't there before.
k

Kevin Kho

01/26/2022, 5:23 PM
Ah ok that sounds promising. It is called only if you build
m

Mathijs Miermans

01/26/2022, 5:23 PM
Ah, that makes sense now. 🙇
2 Views