Hi Prefect experts, I am trying to run the followi...
# prefect-community
l
Hi Prefect experts, I am trying to run the following graphql query via prefect.client. However, it does not like the logs{ message} entry. I was able to run the same graphql query in interactive API. Can anyone point out what I am doing wrong?
Copy code
def get_task_run(flow_run_id):
    task_run_query = """
        query {
        task_run ( where:{flow_run_id:{_eq: "%s"}}){
            id,
            name,
            start_time,
            end_time,
            state,
            state_message,
            created,
            flow_run_id,
            heartbeat,
            logs {message
            }
            state_result,
            state_start_time,
            state_timestamp,
            task_id,
            tenant_id,
            updated,
            version
        }
        }
    """ % flow_run_id
            # logs { 
            # id,
            # level,
            # message  
            # },
    task_run_results = json.loads(Client().graphql(task_run_query).to_json())
    for task_run in task_run_results['data']['task_run']:
        task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
        print('liren again.............')
        save_to_file(task_json,'task_run.log')
k
Hey @liren zhang, I was able to get the code you provided to work as is (with the exception of the
save_to_file
call). Just wanted to check, is your python client authenticated with Prefect Cloud?
l
Yes @Kyle Moon-Wright, it was authenticated. as soon as I removed the logs{}, the code works fine.
k
Hmm, so working as expected? Or are you referring to removing this part?:
Copy code
""" % flow_run_id
            # logs { 
            # id,
            # level,
            # message  
            # },
which can be rectified by adding these fields to the logs{} part of the
task_run_query
call.
l
sorry, I had this originally in my query and took the lines out by commenting them and moved out of the query
the bottom line is when I run the query with logs{message} for example, the code error out during task run in the cloud.
you can ignore what I had commented out
k
Hmm, I’m not able to reproduce - this MRE (w/o
save_to_file
) is working as a flow for me:
Copy code
from prefect import Client, task, Flow
import json

@task
def get_task_run(flow_run_id):
    task_run_query = """
        query {
            task_run (where:{ flow_run_id:{_eq: "%s"} }) {
                logs {
                    id,
                    message,
                    level
                }
            }
        }
    """ % flow_run_id
    task_run_results = json.loads(Client().graphql(task_run_query).to_json())
    for task_run in task_run_results['data']['task_run']:
        task_json = json.dumps(task_run).replace('\\', '\\\\').replace("'", "\\'")
        # save_to_file(task_json,'task_run.log')

with Flow("test") as flow:
    test_id = <MY_FLOW_RUN_ID>
    get_task_run(test_id)

flow.run()
l
Hi @Kyle Moon-Wright Thanks for your response. once I changed flow.register() to flow.run() with the logs{message} included in the query, it is also working for me. The only time that error out is when I register it to cloud.
@Kyle Moon-Wright do you have any additional thought on this?
k
Hey Liren - apologies, I was able to reproduce but was not able to rectify this error. Would you be able to open a GitHub issue on this? This looks to be a Cloud specific bug.