Does anyone know what is going on here? In every t...
# ask-community
j
Does anyone know what is going on here? In every task, I use logger = prefect.context.get('logger') and then call logger.info(), logger.debug(), etc.
Copy code
D:\venv\poetry\.venv\lib\site-packages\prefect\utilities\logging.py:123: 
UserWarning: Failed to write logs with error: 
ClientError('400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>\n\n

The following error messages were provided by the GraphQL server:\n\n    
INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        
"input.logs[0].flow_run_id"; Expected non-nullable type UUID! not to be null.\n    

INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        "input.logs[2].flow_run_id"; Expected non-nullable type UUID! not to be null.\n    

INTERNAL_SERVER_ERROR: Variable "$input" got invalid value null at\n        "input.logs[4].flow_run_id"; Expected non-nullable type UUID! not to be null.\n
c
Hi Joseph - it looks like you’re trying to send logs to Prefect Cloud outside of a flow run context
k
Hey @Joseph Loss, could you show an example of how you’re using the logger?
j
I think @Chris White is right. There were a few places within the flow run that had logging.info. So I'll either change those to print statements or more likely I'll just change them to logger and move them into their respective tasks
👍 1
@Kevin Kho Dude I am so close to getting that massive beta_sheets gist I sent you last week, but there is one thing that I'm trying to do to make our lives easier yet I'm having trouble
😂 1
If you'll look back at https://gist.github.com/chicago-joe/f18ef051da101594160ebd3771ee4373 You'll see that across each function, basically they all have the same first 3 parameters. These are pyodbc connections to different mysql server databases. I can't pickle them, so I've tried using prefect.utilties.as_task(), which worked for the connection but now every task would need to call that and return the result, which is more complicated because of how we wrote our queries
so I'm wondering how I could build something that we can call / map to each task. maybe something like this? https://docs.prefect.io/core/idioms/mapping.html
k
This you can’t do I’m afraid. Tasks outputs needs to be pickle-able because they get distributed to Dask workers with
cloudpickle
. The connection would have to be made inside the mapped task like what you’re doing. What is the issue on the query side?
j
okay no worries. It's not an issue really it's just the way that the queries were written are like this:
Copy code
eng_sra200 = pyodbc.connect('sra200')

@task(log_stdout=True)
def get_tracking_error(eng_sra200, data_datetime, date_next_bus, backfill):

    if backfill:
        run_date = date_next_bus
    else:
        run_date = data_datetime

    query = """
            SELECT accnt, strategyLevel, trackingError / 10000 AS 'trackingError'
            FROM sradb.tbltrackingerrorticker_v3
            WHERE portfolioType = 'accnt-stratLevel'
            and DATE = '%s'
            GROUP BY accnt, strategyLevel
            """ % run_date.strftime('%Y-%m-%d')

    return pd.read_sql(query, eng_sra200)
So I was trying to come up with a way that, for functions that call multiple DSN sources, to automate so that I wouldn't have to do these edits in a dozen functions haha
👍 1