rectalogic
06/22/2022, 4:45 PMKevin Kho
rectalogic
06/22/2022, 5:25 PMdeploy_flows and refuse to deploy if the storage is not what I expect?
The docs mention implementing that but I don't see that method in the source code
https://docs.prefect.io/api/latest/agent/agent.html#agent-2rectalogic
06/22/2022, 5:27 PMdeploy_flowKevin Kho
def myhandler(flow, old_state, new_state):
if new_state.is_running():
#logic to check storage
if storage is not expected:
cancel_flow
So you can use the GraphQL API to find a flow’s storage.
For your own subclass, there is more work. You also have to build an image and get that edited code in, but the upside is the flow is not deployed at all if you can catch it.
For the state handler, the code is easier and contained in the Flow. It will just shut down the ECS container immediately, but it still ends up starting. It will also work across all agent types.Kevin Kho
class MyECSAgent(ECSAgent):
def deploy_flow(self, flow_run):
# insert logic to check storage
if logic:
super.deploy_flow(flow_run)
else:
# some logging and passrectalogic
06/22/2022, 5:33 PMECSAgent and override deploy_flow, check Storage type and just raise if not expected otherwise super() - similar to how it currently checks for Docker storage https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py#L321
I'll look at state handlers tooKevin Kho
query {
flow {
storage
}
}
and doing:
from prefect.client import Client
client = Client()
client.graphql(query)rectalogic
06/22/2022, 5:36 PMisinstance(StorageSchema().load(flow_run.flow.storage), Docker)Kevin Kho
rectalogic
06/22/2022, 9:00 PMtask_definition_arn that my ECSAgent uses - I had thought that was passed into the constructor but apparently it gets it from run_config. Would I use set_temporary_config to set this? The docs say it is only for testing but the CLI uses it (but also I'm not even sure if this is overriding the run_config or not) https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/agent.py#L126Kevin Kho
deploy_flow right? You have the RunConfig there so just compare there?
I think you are trying to make the CLI edits to deploy an agent? You can just make a Python script to call Agent.start()Kevin Kho
MyECSAgent().start() in a python script?rectalogic
06/22/2022, 9:06 PMcloud.api_key in set_temporary_configrectalogic
06/22/2022, 9:07 PMrun_config = self._get_run_config(flow_run, ECSRun) and then write into that?rectalogic
06/22/2022, 9:09 PMKevin Kho
rectalogic
06/22/2022, 9:13 PMECSAgent and then start() it but it looks like the only way for it to get key is via temp config, that's how the CLI does it at leastKevin Kho
prefect auth login -k API_KEY just makes a file called auth.toml and puts the key in. You can try just making that auth.toml file and adding it in there?rectalogic
06/22/2022, 9:19 PMset_temporary_config seemed cleaner until I saw in the docs it said only to be used for testingKevin Kho
rectalogic
06/22/2022, 9:20 PMrectalogic
06/22/2022, 9:21 PMKevin Kho
rectalogic
06/22/2022, 9:23 PMtask_definition_arn?rectalogic
06/22/2022, 9:24 PMKevin Kho
rectalogic
06/22/2022, 9:27 PMtask_definition_path but that really seems nastyKevin Kho
rectalogic
06/22/2022, 9:32 PMagent = ECSAgent(task_definition_arn="arn:...", ...)
agent.start()rectalogic
06/22/2022, 9:33 PMKevin Kho
deploy_flow code and you could pull it from self instead of the run_config and you pretty much just need this else block downwards right?rectalogic
06/22/2022, 9:40 PMdeploy_flow implementation and keep up to date with upstream changes. Was looking for a simpler hook than just copy/paste all that code and modify it. Right now my implementation is simple and just defers to super
def deploy_flow(self, flow_run: GraphQLResult) -> str:
storage = StorageSchema().load(flow_run.flow.storage)
if not isinstance(storage, GitHub) or not storage.repo == os.environ["PREFECT_GITHUB_REPO"]:
raise ValueError(f"Storage {storage.repo} is not cureatr GitHub")
return super().deploy_flow(flow_run)Kevin Kho
super here for what you are trying to do unfortunatelyrectalogic
06/22/2022, 9:45 PMtask_definition_path - that seems a lot safer but kind of ridiculous - because it's going to recreate that task_definition every time even though it already exists...
self.ecs_client.describe_task_definition(taskDefinition=taskdef_arn)rectalogic
06/22/2022, 9:46 PMKevin Kho
deploy_flow but let me double checkrectalogic
06/22/2022, 9:49 PMflow_run.run_config = ECSRun(...)Kevin Kho