liren zhang
12/21/2020, 6:38 PMdef get_task_run(flow_run_id):
task_run_query = """
query {
task_run ( where:{flow_run_id:{_eq: "%s"}}){
id,
name,
start_time,
end_time,
state,
state_message,
created,
flow_run_id,
heartbeat,
logs {message
}
state_result,
state_start_time,
state_timestamp,
task_id,
tenant_id,
updated,
version
}
}
""" % flow_run_id
# logs {
# id,
# level,
# message
# },
task_run_results = json.loads(Client().graphql(task_run_query).to_json())
for task_run in task_run_results['data']['task_run']:
task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
print('liren again.............')
save_to_file(task_json,'task_run.log')
Kyle Moon-Wright
12/21/2020, 7:36 PMsave_to_file
call). Just wanted to check, is your python client authenticated with Prefect Cloud?liren zhang
12/21/2020, 7:55 PMKyle Moon-Wright
12/21/2020, 8:14 PM""" % flow_run_id
# logs {
# id,
# level,
# message
# },
which can be rectified by adding these fields to the logs{} part of the task_run_query
call.liren zhang
12/21/2020, 8:35 PMKyle Moon-Wright
12/21/2020, 8:58 PMsave_to_file
) is working as a flow for me:
from prefect import Client, task, Flow
import json
@task
def get_task_run(flow_run_id):
task_run_query = """
query {
task_run (where:{ flow_run_id:{_eq: "%s"} }) {
logs {
id,
message,
level
}
}
}
""" % flow_run_id
task_run_results = json.loads(Client().graphql(task_run_query).to_json())
for task_run in task_run_results['data']['task_run']:
task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
# save_to_file(task_json,'task_run.log')
with Flow("test") as flow:
test_id = <MY_FLOW_RUN_ID>
get_task_run(test_id)
flow.run()
liren zhang
12/22/2020, 4:02 AMKyle Moon-Wright
12/22/2020, 7:53 PM