Thread
#prefect-community
    r

    rectalogic

    3 months ago
    Is there a way to lock down which Storages an agent is allowed to run? Our concern is if an API key is stolen, and someone submits a flow that e.g. uses a github storage pointing to their own repo and so runs arbitrary code on our ECS agent
    Kevin Kho

    Kevin Kho

    3 months ago
    Hi @rectalogic, no there is no way to lock down the storage. We do scan for publicly exposed keys and notify users, but we don’t have anything more other than to recommend best practices like rotating keys.
    r

    rectalogic

    3 months ago
    would I be able to write my own ECSAgent subclass and override
    deploy_flows
    and refuse to deploy if the storage is not what I expect? The docs mention implementing that but I don't see that method in the source codehttps://docs.prefect.io/api/latest/agent/agent.html#agent-2
    oh, it's
    deploy_flow
    Kevin Kho

    Kevin Kho

    3 months ago
    You could, but then let me just mention an alternative and the pros and cons. I think what you can do instead is use a state handler which can run any Python code. So you can do something like:
    def myhandler(flow, old_state, new_state):
        if new_state.is_running():
            #logic to check storage
            if storage is not expected:
                cancel_flow
    So you can use the GraphQL API to find a flow’s storage. For your own subclass, there is more work. You also have to build an image and get that edited code in, but the upside is the flow is not deployed at all if you can catch it. For the state handler, the code is easier and contained in the Flow. It will just shut down the ECS container immediately, but it still ends up starting. It will also work across all agent types.
    But yes I think you could:
    class MyECSAgent(ECSAgent):
        def deploy_flow(self, flow_run):
            # insert logic to check storage
            if logic:
                super.deploy_flow(flow_run)
            else:
                # some logging and pass
    r

    rectalogic

    3 months ago
    I was going to do something like this - subclass
    ECSAgent
    and override
    deploy_flow
    , check Storage type and just raise if not expected otherwise
    super()
    - similar to how it currently checks for Docker storage https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py#L321 I'll look at state handlers too
    Kevin Kho

    Kevin Kho

    3 months ago
    Yeah and then you can get storage with GraphQL:
    query {
      flow {
        storage
      }
    }
    and doing:
    from prefect.client import Client
    client = Client()
    client.graphql(query)
    r

    rectalogic

    3 months ago
    it looks like the storage is in the flow_run already?
    isinstance(StorageSchema().load(flow_run.flow.storage), Docker)
    Kevin Kho

    Kevin Kho

    3 months ago
    Oh good catch yes that makes sense
    r

    rectalogic

    3 months ago
    hmm, I would also like to enforce the
    task_definition_arn
    that my ECSAgent uses - I had thought that was passed into the constructor but apparently it gets it from run_config. Would I use
    set_temporary_config
    to set this? The docs say it is only for testing but the CLI uses it (but also I'm not even sure if this is overriding the run_config or not) https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/agent.py#L126
    Kevin Kho

    Kevin Kho

    3 months ago
    I dont think you need it there because you can have it in
    deploy_flow
    right? You have the RunConfig there so just compare there? I think you are trying to make the CLI edits to deploy an agent? You can just make a Python script to call
    Agent.start()
    Like
    MyECSAgent().start()
    in a python script?
    r

    rectalogic

    3 months ago
    right, but I could only see how to pass in the api key via setting
    cloud.api_key
    in
    set_temporary_config
    for task_definition_arn, I want to override anything specified and make it use my existing ARN. Can I just
    run_config = self._get_run_config(flow_run, ECSRun)
    and then write into that?
    oh, that's a graphql query. Ugh, so I don't really want whomever submits a flow to have control of this. I want to enforce using my task_definition_arn so that any task that runs on this agent is forced to use it
    Kevin Kho

    Kevin Kho

    3 months ago
    I guess yes you can pass api_key through the temporary config. I think you could do that yeah. I can’t think immediately of things that could break.
    r

    rectalogic

    3 months ago
    is there another way? I construct
    ECSAgent
    and then
    start()
    it but it looks like the only way for it to get key is via temp config, that's how the CLI does it at least
    Kevin Kho

    Kevin Kho

    3 months ago
    Wait sorry what is the issue with doing it that way? I believe if your machine is already authenticated, it can pull that. So I believe
    prefect auth login -k API_KEY
    just makes a file called
    auth.toml
    and puts the key in. You can try just making that
    auth.toml
    file and adding it in there?
    r

    rectalogic

    3 months ago
    ok,
    set_temporary_config
    seemed cleaner until I saw in the docs it said only to be used for testing
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I would personally just use it since the CLI does.
    r

    rectalogic

    3 months ago
    anyway, so what about forcing a specific task_definition_arn for ECSAgent? Is there a way to specify that in some config or set it at runtime?
    it seems odd that ECSAgent takes ARNs for the roles, and a yaml for the task_definition, but the task_definition_arn seems to come from the run_config
    Kevin Kho

    Kevin Kho

    3 months ago
    Agent takes those in and they are the default used if RunConfig does not supply any. If RunConfig is supplied, it will override those
    r

    rectalogic

    3 months ago
    ok, but where in the agent can I specify
    task_definition_arn
    ?
    I don't even see how to default it let alone enforce it
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I see what you mean. I think that really is only through the RunConfig
    r

    rectalogic

    3 months ago
    ugh, well my flow submitters won't know what value to use, and I don't want them to have control over any of this. I want to control exactly how the ECS task is configured that runs their flows. I guess I can use boto3 to describe my task_definition_arn and serialize that as YAML and save to a file and pass into ECSAgent as
    task_definition_path
    but that really seems nasty
    Kevin Kho

    Kevin Kho

    3 months ago
    Trying to understand better. What would be the ideal here? You submit an ARN to the CustomECSAgent and the RunConfig doesn’t need to know about it so the Flow pulls it from the agent?
    r

    rectalogic

    3 months ago
    the ideal is to force it to use the ARN I specify, so the flow submitter has no control
    agent = ECSAgent(task_definition_arn="arn:...", ...)
    agent.start()
    so whether or not the flow_run specifies anything related to the task_definition, it overrides it and uses the one I specified. So I control the ECR image to use, the roles in use, memory/cpu etc. The agent controls everything related to the task, not the flow submitter
    Kevin Kho

    Kevin Kho

    3 months ago
    I mean it should be doable, the custom ECSAgent class will just pretty much need it’s own
    deploy_flow
    code and you could pull it from
    self
    instead of the
    run_config
    and you pretty much just need this else block downwards right?
    r

    rectalogic

    3 months ago
    well I didn't want to have to take over the entire
    deploy_flow
    implementation and keep up to date with upstream changes. Was looking for a simpler hook than just copy/paste all that code and modify it. Right now my implementation is simple and just defers to super
    def deploy_flow(self, flow_run: GraphQLResult) -> str:
            storage = StorageSchema().load(flow_run.flow.storage)
            if not isinstance(storage, GitHub) or not storage.repo == os.environ["PREFECT_GITHUB_REPO"]:
                raise ValueError(f"Storage {storage.repo} is not cureatr GitHub")
            return super().deploy_flow(flow_run)
    Kevin Kho

    Kevin Kho

    3 months ago
    Yeah…I don’t think we can rely on the
    super
    here for what you are trying to do unfortunately
    r

    rectalogic

    3 months ago
    if there is no simple way to do this I guess I'll just describe the ARN and dump to YAML and pass the path to that YAML in as
    task_definition_path
    - that seems a lot safer but kind of ridiculous - because it's going to recreate that task_definition every time even though it already exists...
    self.ecs_client.describe_task_definition(taskDefinition=taskdef_arn)
    but just to confirm, there is no way for the agent to apply modifications/overrides to the run_config it receives?
    Kevin Kho

    Kevin Kho

    3 months ago
    I don’t think so unless you edit the
    deploy_flow
    but let me double check
    r

    rectalogic

    3 months ago
    can I just set run_config on the flow_run to a custom ECSRun?
    flow_run.run_config = ECSRun(...)
    Kevin Kho

    Kevin Kho

    3 months ago
    Definitely a good thought. It seems pretty tricky to pull off. Honestly not sure.