Is there a way to lock down which `Storage`s an ag...
# prefect-community
r
Is there a way to lock down which `Storage`s an agent is allowed to run? Our concern is if an API key is stolen, and someone submits a flow that e.g. uses a github storage pointing to their own repo and so runs arbitrary code on our ECS agent
k
Hi @rectalogic, no there is no way to lock down the storage. We do scan for publicly exposed keys and notify users, but we don’t have anything more other than to recommend best practices like rotating keys.
r
would I be able to write my own ECSAgent subclass and override
deploy_flows
and refuse to deploy if the storage is not what I expect? The docs mention implementing that but I don't see that method in the source code https://docs.prefect.io/api/latest/agent/agent.html#agent-2
oh, it's
deploy_flow
k
You could, but then let me just mention an alternative and the pros and cons. I think what you can do instead is use a state handler which can run any Python code. So you can do something like:
Copy code
def myhandler(flow, old_state, new_state):
    if new_state.is_running():
        #logic to check storage
        if storage is not expected:
            cancel_flow
So you can use the GraphQL API to find a flow’s storage. For your own subclass, there is more work. You also have to build an image and get that edited code in, but the upside is the flow is not deployed at all if you can catch it. For the state handler, the code is easier and contained in the Flow. It will just shut down the ECS container immediately, but it still ends up starting. It will also work across all agent types.
But yes I think you could:
Copy code
class MyECSAgent(ECSAgent):
    def deploy_flow(self, flow_run):
        # insert logic to check storage
        if logic:
            super.deploy_flow(flow_run)
        else:
            # some logging and pass
r
I was going to do something like this - subclass
ECSAgent
and override
deploy_flow
, check Storage type and just raise if not expected otherwise
super()
- similar to how it currently checks for Docker storage https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py#L321 I'll look at state handlers too
k
Yeah and then you can get storage with GraphQL:
Copy code
query {
  flow {
    storage
  }
}
and doing:
Copy code
from prefect.client import Client
client = Client()
client.graphql(query)
r
it looks like the storage is in the flow_run already?
isinstance(StorageSchema().load(flow_run.flow.storage), Docker)
k
Oh good catch yes that makes sense
r
hmm, I would also like to enforce the
task_definition_arn
that my ECSAgent uses - I had thought that was passed into the constructor but apparently it gets it from run_config. Would I use
set_temporary_config
to set this? The docs say it is only for testing but the CLI uses it (but also I'm not even sure if this is overriding the run_config or not) https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/agent.py#L126
k
I dont think you need it there because you can have it in
deploy_flow
right? You have the RunConfig there so just compare there? I think you are trying to make the CLI edits to deploy an agent? You can just make a Python script to call
Agent.start()
Like
MyECSAgent().start()
in a python script?
r
right, but I could only see how to pass in the api key via setting
cloud.api_key
in
set_temporary_config
for task_definition_arn, I want to override anything specified and make it use my existing ARN. Can I just
run_config = self._get_run_config(flow_run, ECSRun)
and then write into that?
oh, that's a graphql query. Ugh, so I don't really want whomever submits a flow to have control of this. I want to enforce using my task_definition_arn so that any task that runs on this agent is forced to use it
k
I guess yes you can pass api_key through the temporary config. I think you could do that yeah. I can’t think immediately of things that could break.
r
is there another way? I construct
ECSAgent
and then
start()
it but it looks like the only way for it to get key is via temp config, that's how the CLI does it at least
k
Wait sorry what is the issue with doing it that way? I believe if your machine is already authenticated, it can pull that. So I believe
prefect auth login -k API_KEY
just makes a file called
auth.toml
and puts the key in. You can try just making that
auth.toml
file and adding it in there?
r
ok,
set_temporary_config
seemed cleaner until I saw in the docs it said only to be used for testing
k
Ah I would personally just use it since the CLI does.
r
anyway, so what about forcing a specific task_definition_arn for ECSAgent? Is there a way to specify that in some config or set it at runtime?
it seems odd that ECSAgent takes ARNs for the roles, and a yaml for the task_definition, but the task_definition_arn seems to come from the run_config
k
Agent takes those in and they are the default used if RunConfig does not supply any. If RunConfig is supplied, it will override those
r
ok, but where in the agent can I specify
task_definition_arn
?
I don't even see how to default it let alone enforce it
k
Ah I see what you mean. I think that really is only through the RunConfig
r
ugh, well my flow submitters won't know what value to use, and I don't want them to have control over any of this. I want to control exactly how the ECS task is configured that runs their flows. I guess I can use boto3 to describe my task_definition_arn and serialize that as YAML and save to a file and pass into ECSAgent as
task_definition_path
but that really seems nasty
k
Trying to understand better. What would be the ideal here? You submit an ARN to the CustomECSAgent and the RunConfig doesn’t need to know about it so the Flow pulls it from the agent?
r
the ideal is to force it to use the ARN I specify, so the flow submitter has no control
Copy code
agent = ECSAgent(task_definition_arn="arn:...", ...)
agent.start()
so whether or not the flow_run specifies anything related to the task_definition, it overrides it and uses the one I specified. So I control the ECR image to use, the roles in use, memory/cpu etc. The agent controls everything related to the task, not the flow submitter
k
I mean it should be doable, the custom ECSAgent class will just pretty much need it’s own
deploy_flow
code and you could pull it from
self
instead of the
run_config
and you pretty much just need this else block downwards right?
r
well I didn't want to have to take over the entire
deploy_flow
implementation and keep up to date with upstream changes. Was looking for a simpler hook than just copy/paste all that code and modify it. Right now my implementation is simple and just defers to super
Copy code
def deploy_flow(self, flow_run: GraphQLResult) -> str:
        storage = StorageSchema().load(flow_run.flow.storage)
        if not isinstance(storage, GitHub) or not storage.repo == os.environ["PREFECT_GITHUB_REPO"]:
            raise ValueError(f"Storage {storage.repo} is not cureatr GitHub")
        return super().deploy_flow(flow_run)
k
Yeah…I don’t think we can rely on the
super
here for what you are trying to do unfortunately
r
if there is no simple way to do this I guess I'll just describe the ARN and dump to YAML and pass the path to that YAML in as
task_definition_path
- that seems a lot safer but kind of ridiculous - because it's going to recreate that task_definition every time even though it already exists...
self.ecs_client.describe_task_definition(taskDefinition=taskdef_arn)
but just to confirm, there is no way for the agent to apply modifications/overrides to the run_config it receives?
k
I don’t think so unless you edit the
deploy_flow
but let me double check
r
can I just set run_config on the flow_run to a custom ECSRun?
flow_run.run_config = ECSRun(...)
k
Definitely a good thought. It seems pretty tricky to pull off. Honestly not sure.