rectalogic
06/22/2022, 4:45 PMKevin Kho
06/22/2022, 5:22 PMrectalogic
06/22/2022, 5:25 PMdeploy_flows
and refuse to deploy if the storage is not what I expect?
The docs mention implementing that but I don't see that method in the source code
https://docs.prefect.io/api/latest/agent/agent.html#agent-2deploy_flow
Kevin Kho
06/22/2022, 5:30 PMdef myhandler(flow, old_state, new_state):
if new_state.is_running():
#logic to check storage
if storage is not expected:
cancel_flow
So you can use the GraphQL API to find a flow’s storage.
For your own subclass, there is more work. You also have to build an image and get that edited code in, but the upside is the flow is not deployed at all if you can catch it.
For the state handler, the code is easier and contained in the Flow. It will just shut down the ECS container immediately, but it still ends up starting. It will also work across all agent types.class MyECSAgent(ECSAgent):
def deploy_flow(self, flow_run):
# insert logic to check storage
if logic:
super.deploy_flow(flow_run)
else:
# some logging and pass
rectalogic
06/22/2022, 5:33 PMECSAgent
and override deploy_flow
, check Storage type and just raise if not expected otherwise super()
- similar to how it currently checks for Docker storage https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py#L321
I'll look at state handlers tooKevin Kho
06/22/2022, 5:34 PMquery {
flow {
storage
}
}
and doing:
from prefect.client import Client
client = Client()
client.graphql(query)
rectalogic
06/22/2022, 5:36 PMisinstance(StorageSchema().load(flow_run.flow.storage), Docker)
Kevin Kho
06/22/2022, 5:37 PMrectalogic
06/22/2022, 9:00 PMtask_definition_arn
that my ECSAgent uses - I had thought that was passed into the constructor but apparently it gets it from run_config. Would I use set_temporary_config
to set this? The docs say it is only for testing but the CLI uses it (but also I'm not even sure if this is overriding the run_config or not) https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/agent.py#L126Kevin Kho
06/22/2022, 9:04 PMdeploy_flow
right? You have the RunConfig there so just compare there?
I think you are trying to make the CLI edits to deploy an agent? You can just make a Python script to call Agent.start()
MyECSAgent().start()
in a python script?rectalogic
06/22/2022, 9:06 PMcloud.api_key
in set_temporary_config
run_config = self._get_run_config(flow_run, ECSRun)
and then write into that?Kevin Kho
06/22/2022, 9:11 PMrectalogic
06/22/2022, 9:13 PMECSAgent
and then start()
it but it looks like the only way for it to get key is via temp config, that's how the CLI does it at leastKevin Kho
06/22/2022, 9:18 PMprefect auth login -k API_KEY
just makes a file called auth.toml
and puts the key in. You can try just making that auth.toml
file and adding it in there?rectalogic
06/22/2022, 9:19 PMset_temporary_config
seemed cleaner until I saw in the docs it said only to be used for testingKevin Kho
06/22/2022, 9:20 PMrectalogic
06/22/2022, 9:20 PMKevin Kho
06/22/2022, 9:21 PMrectalogic
06/22/2022, 9:23 PMtask_definition_arn
?Kevin Kho
06/22/2022, 9:25 PMrectalogic
06/22/2022, 9:27 PMtask_definition_path
but that really seems nastyKevin Kho
06/22/2022, 9:30 PMrectalogic
06/22/2022, 9:32 PMagent = ECSAgent(task_definition_arn="arn:...", ...)
agent.start()
Kevin Kho
06/22/2022, 9:37 PMdeploy_flow
code and you could pull it from self
instead of the run_config
and you pretty much just need this else block downwards right?rectalogic
06/22/2022, 9:40 PMdeploy_flow
implementation and keep up to date with upstream changes. Was looking for a simpler hook than just copy/paste all that code and modify it. Right now my implementation is simple and just defers to super
def deploy_flow(self, flow_run: GraphQLResult) -> str:
storage = StorageSchema().load(flow_run.flow.storage)
if not isinstance(storage, GitHub) or not storage.repo == os.environ["PREFECT_GITHUB_REPO"]:
raise ValueError(f"Storage {storage.repo} is not cureatr GitHub")
return super().deploy_flow(flow_run)
Kevin Kho
06/22/2022, 9:42 PMsuper
here for what you are trying to do unfortunatelyrectalogic
06/22/2022, 9:45 PMtask_definition_path
- that seems a lot safer but kind of ridiculous - because it's going to recreate that task_definition every time even though it already exists...
self.ecs_client.describe_task_definition(taskDefinition=taskdef_arn)
Kevin Kho
06/22/2022, 9:48 PMdeploy_flow
but let me double checkrectalogic
06/22/2022, 9:49 PMflow_run.run_config = ECSRun(...)
Kevin Kho
06/22/2022, 9:50 PM