liren zhang
12/21/2020, 6:38 PMdef get_task_run(flow_run_id):
    task_run_query = """
        query {
        task_run ( where:{flow_run_id:{_eq: "%s"}}){
            id,
            name,
            start_time,
            end_time,
            state,
            state_message,
            created,
            flow_run_id,
            heartbeat,
            logs {message
            }
            state_result,
            state_start_time,
            state_timestamp,
            task_id,
            tenant_id,
            updated,
            version
        }
        }
    """ % flow_run_id
            # logs { 
            # id,
            # level,
            # message  
            # },
    task_run_results = json.loads(Client().graphql(task_run_query).to_json())
    for task_run in task_run_results['data']['task_run']:
        task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
        print('liren again.............')
        save_to_file(task_json,'task_run.log')Kyle Moon-Wright
12/21/2020, 7:36 PMsave_to_file call). Just wanted to check, is your python client authenticated with Prefect Cloud?liren zhang
12/21/2020, 7:55 PMKyle Moon-Wright
12/21/2020, 8:14 PM""" % flow_run_id
            # logs { 
            # id,
            # level,
            # message  
            # },
which can be rectified by adding these fields to the logs{} part of the task_run_query call.liren zhang
12/21/2020, 8:35 PMliren zhang
12/21/2020, 8:36 PMliren zhang
12/21/2020, 8:37 PMKyle Moon-Wright
12/21/2020, 8:58 PMsave_to_file) is working as a flow for me:
from prefect import Client, task, Flow
import json
@task
def get_task_run(flow_run_id):
    task_run_query = """
        query {
            task_run (where:{ flow_run_id:{_eq: "%s"} }) {
                logs {
                    id,
                    message,
                    level
                }
            }
        }
    """ % flow_run_id
    task_run_results = json.loads(Client().graphql(task_run_query).to_json())
    for task_run in task_run_results['data']['task_run']:
        task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
        # save_to_file(task_json,'task_run.log')
with Flow("test") as flow:
    test_id = <MY_FLOW_RUN_ID>
    get_task_run(test_id)
flow.run()liren zhang
12/22/2020, 4:02 AMliren zhang
12/22/2020, 5:13 PMKyle Moon-Wright
12/22/2020, 7:53 PM