Matt Alhonte
03/04/2022, 1:22 AMset_schedule_inactive
). It lets me read (I grab all the flows from a given project), but the mutate
gives me this error (the query works when I go to the Interactive tab on the GUI btw)
AuthorizationError([{'path': ['flow'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}])
Unexpected error: ClientError([{'path': ['set_schedule_inactive'], 'message': 'Unauthorized', 'extensions': {'code': 'FORBIDDEN'}}])
Kevin Kho
03/04/2022, 4:23 AMAnna Geller
03/04/2022, 9:21 AMprefect agent local start --label xxx --key API_KEY
Matt Alhonte
03/05/2022, 1:38 AM@task
def make_project_flow_grabber_stmt(project: str) -> str:
return "\n".join(
[
"{",
" flow(",
f' where: {{_and: [{{archived: {{_eq: false}}}}, {{project: {{name: {{_eq: "{project}"}}}}}}]}}',
" ) {",
" id",
" }",
"}",
]
)
@task(log_stdout=True, max_retries=3, retry_delay=datetime.timedelta(seconds=10))
def run_graphql(query: str):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(query)
client = Client()
return client.graphql(query)
then try to turn off the schedules in that project with this:
@task
def make_flow_inactive_queries(flow_ids: t.List[str]) -> t.List[str]:
return [
"\n".join(
[
"mutation {",
" set_schedule_inactive(input: {",
f""" flow_id: "{flow_id['id']}" """,
" }) {",
" success",
" }",
"}",
]
)
for flow_id in flow_ids
]
@task
def turn_off_schedules(queries: t.List[str]) -> t.List[dict]:
client = Client()
return [client.graphql(query) for query in queries]
and here's the body of the flow itself:
project = Parameter("project", required=True, default="")
flow_query = make_project_flow_grabber_stmt(project)
query_result = run_graphql(flow_query)
queries = make_flow_inactive_queries(query_result["data"]["flow"])
mutated = turn_off_schedules(queries)
Client
object. Didn't work - is that sorta the same thing?Kevin Kho
03/05/2022, 2:03 AMMatt Alhonte
03/08/2022, 2:14 AMprefect.utilities.exceptions.AuthorizationError: [{'path': ['set_schedule_inactive'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
This is the code btw:
@task
def turn_off_schedules(queries: t.List[str]) -> t.List[dict]:
api_key = PrefectSecret("MATT_API_KEY")
client = Client(api_token=api_key)
return [client.graphql(query) for query in queries]
Kevin Kho
03/08/2022, 2:47 AMMatt Alhonte
03/08/2022, 2:57 AMKevin Kho
03/08/2022, 3:07 AMSecret.get()
?Matt Alhonte
03/08/2022, 3:13 AM@task
def turn_off_schedules(queries: t.List[str], api_key) -> t.List[dict]:
client = Client(api_token=api_key)
return [client.graphql(query) for query in queries]
Kevin Kho
03/08/2022, 3:15 AMMatt Alhonte
03/08/2022, 3:49 AMKevin Kho
03/08/2022, 3:51 AM