Anyone know how to resolve this error (s)that prev...
# ask-community
j
Anyone know how to resolve this error (s)that prevents my flow from running?
- cloudpickle: (flow built with '1.6.0', currently running with '2.0.0')\n  - prefect: (flow built with '0.15.6', currently running with '0.15.9')\n  - python: (flow built with '3.8.8', currently running with '3.7.11')")
k
In general, the agent versions for Python, prefect, and cloudpickle need to be aligned with the versions you registered for Flow in. This is because the flow is serialized and the deserialization needs to match the serialization.
j
Just fixed it by changing the flow name (go figure)... onto another quick question
TypeError: can't pickle SSLContext objects
k
The Prefect
@task
has
checkpoint=True
enabled by default so that when your Flow fails, it can be restarted from failure. This
checkpoint=True
serializes the return if your task. I think you will get this error if you return some sort of connection or client. For that specific task, you can turn off checkpointing with
checkpoint=False
j
Similar to this?
@task(log_stdout=True, checkpoint=False)
k
Yes exactly
j
Well, I got a little further to a different error 🙂 Really appreciate the help, I'm going to see if I can get past this one
👍 1
@Kevin Kho Any ideas on this? Is this an issue in setting up the database? IT is coming in my "load" task: ``pandas.io.sql.DatabaseError: Execution failed on sql 'SELECT name FROM sqlite_master WHERE type='table' AND name=?;': not all arguments converted during string formatting`
k
Could you show me the task definition?
Looks like this error comes from pandas though? But I can take a look
j
The connection task or the load?
k
Both works
j
Copy code
@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame) -> pd.DataFrame:
    res = dataframe.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
    print(res)
For the connection I'm hiding some sensitive data
Copy code
@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
    ssl_args = {these are fine}
    user = credentials['username']
    password = credentials['password']
    url = 'confidential-url'
    port = 3306
    connection = pmy.connect(user=user,password=password,host=url,port=port,connect_timeout=10,ssl=ssl_args,
    charset='utf8')
    return connection
k
You can post it. Just replace the stuff with like “XXXXXX”
Ah ok
j
A few resources said to use sqlalchemy but didn't really explain how to incorporate it
k
This will show you.
I don’t think there is anything wrong on the Prefect side of things. It just seems that the pymysql gives headaches
j
Replace the
pmy.connect
portion with
create.engine
?
k
Yes and then pass the engine to your
to_sql
call
j
Cool, thank you!
k
Actually….just wanna make sure you know we have a MySQL task?
j
I read through that documentation but didn't see an actual example haha
k
This says your current approach is deprecated
j
Good to know for our internal documentation 😉
k
I don’t have a MySQL db to test but it would be like this:
Copy code
mysql = MySQLFetch()
with Flow("...") as flow:
    a = mysql(db_name, ...)
to get stuff from the database
j
I'd be replacing my "db_connection" with MYSQLFetch, I assume?
k
Fetch opens a connection, runs the query, and closes all in one
j
Is there an example lying around that I can model it off?
k
I haven’t seen, but the code snippet above there is how to use it in your flow
j
Cool, I'll see what I can do!
Appreciate the help
👍 1
One more dumb question - in your example above does the MySQLFetch function as its own task?
Copy code
# In[28]:
@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
    db = MySQLFetch(bla bla)
    return MySQLFetch
k
it is it’s own task so no need to wrap it like that unless you want to modify it in some way
in that case, you can do
Copy code
@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
    df = MySQLFetch(bla bla).run()
    return df
Note this doesnt return a connection. It returns data already
j
ahhhh. It functions as an "extract" task, functionally.
Do I need to be importing it earlier? I'm getting an error on using MySQLFetch:
k
from prefect.tasks.mysql import MySQLFetch
I think
Docs for that is here
j
I'll never get over the python syntax after learning R.... thank you!!
k
That is true. You just get everything with
library(…)
j
TypeError: can't concat list to bytes
?
k
Could you show me what you are doing?
j
Sure! So, I'm trying to combine the MYSQLFetch function with a standard ETL task/flow setup so that when I end up making transformations, I can include those in the "transform"
Copy code
@task(log_stdout=True)
def db_connection(credentials: dict) -> any:
    import sqlalchemy as db
    ssl_args = {"XXX"}} ## for local agent testing
    user = credentials['username']
    password = credentials['password']
    url = 'XXX'
    port = XXX
    connection = db.create_engine(
        f'mysql://{user}:{password}@{url}:{port}/?charset=utf8', 
        connect_args=ssl_args
    )
    return connection 

@task(log_stdout=True, checkpoint=False)
def extract(credentials: dict) -> any:
    ssl_args = {"XXXX"}} ## for local agent testing
    query = "QUERY HERE"
    df = MySQLFetch(user=['username'], 
    password=['password'],
    host='XXX',
    port=XXXX,
    ssl=ssl_args,
    charset='utf8',
    query=query).run()
    return df

# This is where we will do any necessary date transformations
@task(log_stdout=True)
def transform(df: pd.DataFrame) -> pd.DataFrame:
    # print(dataframe)
    return df

@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame) -> pd.DataFrame:
    res = dataframe.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
    print(res)

# In[29]:


# Flow
flow = Flow("Flow Name Here")
with flow:
    df = extract(credentials = PrefectSecret("SECRET"))
    sink = transform(df=df)
    load(connection=connection, if_exists='append', db_table='XXXXX', dataframe=sink)
k
Was in a call. I am not sure where this error is coming from. Do you have a clue? Is it in the checkpointing of load?
j
Its in
db_connection
k
Maybe turn checkpointing off for that one
j
Led me back to the "can't concat list to bytes" error in the extract task unfortunately.
k
Oh oof. Why is your username and password in a Python lisT?
Should it be
credentials['username']
?
j
That took care of the error
Just down to 1 remaining problem which I believe is in the "connection" task
TypeError: connection() missing 1 required positional argument: 'credentials'
I think I should rework my "load' task to just use the results of the extract task?
k
I can’t see what connection is inside the flow block?
j
Copy code
# Flow
flow = Flow("XXX")
with flow:
    df = extract(credentials = PrefectSecret("XXXX"))
    sink = transform(df=df)
    load(connection=connection, credentials = PrefectSecret("XXX"), if_exists='append', db_table='XXXt', dataframe=sink)
And this is the connection
Copy code
@task(log_stdout=True, checkpoint=False)
def connection(credentials: dict) -> any:
    import sqlalchemy as db
"XXX"}} ## for local agent testing
    user = credentials['username']
    password = credentials['password']
    url = 'XXX'
    port = XXXX
    connection = db.create_engine(
        f'mysql://{user}:{password}@{url}:{port}/?charset=utf8', 
        connect_args=ssl_args
    )
    return connection
k
something is wrong here because
load(connection=connection)
is using the task. I think you need to create the connection then pass it. You are passing a task
Also it would be best practice to rename the function I think
j
AttributeError: 'tuple' object has no attribute 'to_sql'
Seems like I need to convert the requested data prior to writing it back?
k
Let me see the return type of MySQLFetch. we probably just need to convert to pandas
j
Sound good, thanks
k
Are you trying to write to a SQL database with
to_sql
?
j
yes
k
I can’t immediately tell what the return type is. Can you try logging the output? I suspect it’s
List[Tuple]
but not 100% sure
Do this for a quick test:
Copy code
results = MySQLFetch(...).run()
print(results)
print(type(results))
No need to be in a Flow for this or task. Just normal Python
j
I'm getting some errors but I believe its a List[Tuple]
k
You need to convert it to the DataFrame like this to get the
to_sql
method
j
I used the from_items method and got this:
("type object 'DataFrame' has no attribute 'from_items'")
k
That does look deprecated
Method 2 will probably fit though i think
j
TypeError("'datetime.date' object is not iterable")
k
Could you show me the print output?
j
of the table or the full error log?
k
The table output so we can figure out how to put it in pandas
or just part of it to give me a clue
j
sorry was debugging haha
k
I would prefer the print in the terminal so I have a feel for the data type
j
I actually managed to get past that part!
k
thumbs up
j
One last error here, on my load:
Copy code
@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame, credentials: dict) -> pd.DataFrame:
    res = df.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
    ## res = dataframe.to_sql(name=db_table, con=connection, credentials = PrefectSecret("XXX"), if_exists=if_exists, chunksize=None, index=False)
    print(res)
The error:
'FunctionTask' object has no attribute 'to_sql'")
"df" is the DataFrame here.
k
FunctionTask
is something you wrapped in a
@task
decorator. This is happening because df is not something being passed into this function. I think you want to use
<http://dataframe.to|dataframe.to>_sql
or change your input to
df
j
odd, same error when making both changes
k
no no you need one or the other but not both. the variable name needs to match the input argument
j
oh sorry that's what I meant haha