Chris Jordan
02/04/2021, 10:10 PMrerun_flow = StartFlowRun(
flow_name="blast_import_flow",
project_name="python_imports"
)
spawn_time_series_import = StartFlowRun(
project_name="python_imports",
flow_name="blast_metric_series_flow",
wait=True)
with Flow("blast_import_flow",
schedule=daily_schedule,
state_handlers=[cloud_only_slack_handler]
) as flow:
data_object = get_blast_batch()
check_api_result = check_api(data_object=data_object)
transformed_data_objects = transform(data_object=check_api_result)
imported = import_to_database(data_object_list=transformed_data_objects)
blast_ids, record_length = push_records_to_summary_table(imported)
# package ids into the right dict format
packaged_ids = package_into_params.map(blast_id=blast_ids)
spawn = spawn_time_series_import.map(
parameters=packaged_ids)
with case(record_length, 5):
rerun_flow()
The ETL portions of the flow are working just fine, and the spawn_time_series_import
is correctly creating its tasks. But rerun_flow
isn't - I'm seeing this error:
Unexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 860, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 449, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run
run_link = client.get_cloud_url("flow-run", flow_run_id)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 887, in get_cloud_url
tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 920, in get_default_tenant_slug
res = self.graphql(query)
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 318, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
I'm not sure what the difference is, aside from the wait. I figure I don't want a bunch of these telescoping and waiting on each other to finish.Zanie
Chris Jordan
02/04/2021, 10:51 PMZanie
prefect diagnostics
locally and from the agent's machine?Chris Jordan
02/04/2021, 10:55 PMZanie
Chris Jordan
02/04/2021, 10:57 PM(python-imports-3.7.6) ┌─[cjordan@WMNAPA-BBO02-20] - [~/work/python_imports] - [2021-02-04 05:56:36]
└─[0] <git:(final_blast bad0de3✈) > prefect diagnostics
{
"config_overrides": {
"cloud": {
"graphql": true
},
"context": {
"secrets": false
},
"debug": true,
"engine": {
"executor": {
"dask": {
"address": true,
"local_processes": true
}
}
},
"flows": {
"defaults": {
"storage": {
"default_class": true
}
}
}
},
"env_vars": [
"PREFECT__CLOUD__AGENT__AUTH_TOKEN"
],
"system_information": {
"platform": "Darwin-19.6.0-x86_64-i386-64bit",
"prefect_backend": "server",
"prefect_version": "0.13.18",
"python_version": "3.7.6"
}
}
(python-imports-3.7.6) ┌─[cjordan@WMNAPA-BBO02-20] - [~/work/python_imports] - [2021-02-04 05:56:41]
└─[0] <git:(final_blast bad0de3✈) >
Zanie
Chris Jordan
02/04/2021, 10:59 PMZanie
Chris Jordan
02/04/2021, 11:00 PMcjordan@livo:/data/cjordan/python_imports$ prefect diagnostics
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-4.4.0-1100-aws-x86_64-with-debian-stretch-sid",
"prefect_backend": "cloud",
"prefect_version": "0.13.18",
"python_version": "3.7.6"
}
}
Zanie
Chris Jordan
02/04/2021, 11:02 PMZanie
def get_default_tenant_slug(self, as_user: bool = True) -> str:
"""
Get the default tenant slug for the currently authenticated user
Args:
- as_user (bool, optional): whether this query is being made from a USER scoped token;
defaults to `True`. Only used internally for queries made from RUNNERs
Returns:
- str: the slug of the current default tenant for this user
"""
if as_user:
query = {
"query": {"user": {"default_membership": {"tenant": "slug"}}}
} # type: dict
else:
query = {"query": {"tenant": {"slug"}}}
res = self.graphql(query)
if as_user:
user = res.get("data").user[0]
slug = user.default_membership.tenant.slug
else:
slug = res.get("data").tenant[0].slug
return slug
Chris Jordan
02/04/2021, 11:03 PMZanie
Chris Jordan
02/04/2021, 11:05 PMZanie
Chris Jordan
02/04/2021, 11:06 PMZanie
Chris Jordan
02/04/2021, 11:07 PMZanie
Chris Jordan
02/04/2021, 11:09 PMprefect agent token
prefect agent start \
--volume "${WORKDIR}/.config.toml:/opt/prefect/config.toml" \
-t "${token}" \
docker \
--show_flow_logs=True
Zanie
Chris Jordan
02/04/2021, 11:10 PMUSER-scoped API tokens function as personal access tokens.
Zanie
Chris Jordan
02/04/2021, 11:15 PMZanie
as_user=False
it's trying to access something that RUNNER tokens don't have permissions to see.Chris Jordan
02/04/2021, 11:19 PM(python-imports-3.7.6) cjordan@livo:/data/cjordan/python_imports$ screen -r
return self.main(*args, **kwargs) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/prefect/cli/agent.py", line 666, in start
docker_interface=not no_docker_interface,
File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/prefect/agent/agent.py", line 219, in start
self._verify_token(self.client.get_auth_token())
File "/home/users/cjordan/.pyenv/versions/3.7.6/envs/python-imports-3.7.6/lib/python3.7/site-packages/prefect/agent/agent.py", line 178, in _verify_token
raise AuthorizationError("Provided token does not have a RUNNER scope.")
prefect.utilities.exceptions.AuthorizationError: Provided token does not have a RUNNER scope.
cjordan@livo:/data/cjordan/python_imports$
Zanie
Chris Jordan
02/04/2021, 11:21 PMZanie
Chris Jordan
02/04/2021, 11:22 PMProvided token does not have a RUNNER scope.
Zanie
Chris Jordan
02/04/2021, 11:34 PMZanie
Chris Jordan
02/04/2021, 11:35 PM