Jason Motley
11/18/2021, 12:22 AM- cloudpickle: (flow built with '1.6.0', currently running with '2.0.0')\n - prefect: (flow built with '0.15.6', currently running with '0.15.9')\n - python: (flow built with '3.8.8', currently running with '3.7.11')")
Kevin Kho
Jason Motley
11/18/2021, 12:27 AMJason Motley
11/18/2021, 12:28 AMTypeError: can't pickle SSLContext objects
Kevin Kho
@task
has checkpoint=True
enabled by default so that when your Flow fails, it can be restarted from failure. This checkpoint=True
serializes the return if your task. I think you will get this error if you return some sort of connection or client. For that specific task, you can turn off checkpointing with checkpoint=False
Jason Motley
11/18/2021, 12:32 AM@task(log_stdout=True, checkpoint=False)
Kevin Kho
Jason Motley
11/18/2021, 12:34 AMJason Motley
11/18/2021, 4:08 PMKevin Kho
Kevin Kho
Jason Motley
11/18/2021, 4:10 PMKevin Kho
Jason Motley
11/18/2021, 4:10 PM@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame) -> pd.DataFrame:
res = dataframe.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
print(res)
Jason Motley
11/18/2021, 4:11 PMJason Motley
11/18/2021, 4:11 PM@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
ssl_args = {these are fine}
user = credentials['username']
password = credentials['password']
url = 'confidential-url'
port = 3306
connection = pmy.connect(user=user,password=password,host=url,port=port,connect_timeout=10,ssl=ssl_args,
charset='utf8')
return connection
Kevin Kho
Kevin Kho
Jason Motley
11/18/2021, 4:12 PMKevin Kho
Kevin Kho
Jason Motley
11/18/2021, 4:13 PMpmy.connect
portion with create.engine
?Kevin Kho
to_sql
callJason Motley
11/18/2021, 4:16 PMKevin Kho
Jason Motley
11/18/2021, 4:17 PMKevin Kho
Jason Motley
11/18/2021, 4:18 PMKevin Kho
mysql = MySQLFetch()
with Flow("...") as flow:
a = mysql(db_name, ...)
to get stuff from the databaseJason Motley
11/18/2021, 4:24 PMKevin Kho
Jason Motley
11/18/2021, 4:34 PMKevin Kho
Jason Motley
11/18/2021, 4:35 PMJason Motley
11/18/2021, 4:35 PMJason Motley
11/18/2021, 4:52 PMJason Motley
11/18/2021, 4:53 PM# In[28]:
@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
db = MySQLFetch(bla bla)
return MySQLFetch
Kevin Kho
Kevin Kho
@task(log_stdout=True, checkpoint=False)
def db_connection(credentials: dict) -> any:
df = MySQLFetch(bla bla).run()
return df
Note this doesnt return a connection. It returns data alreadyJason Motley
11/18/2021, 4:56 PMJason Motley
11/18/2021, 5:02 PMKevin Kho
from prefect.tasks.mysql import MySQLFetch
I thinkKevin Kho
Jason Motley
11/18/2021, 5:08 PMKevin Kho
library(…)
Jason Motley
11/18/2021, 5:27 PMTypeError: can't concat list to bytes
?Kevin Kho
Jason Motley
11/18/2021, 5:55 PMJason Motley
11/18/2021, 5:56 PM@task(log_stdout=True)
def db_connection(credentials: dict) -> any:
import sqlalchemy as db
ssl_args = {"XXX"}} ## for local agent testing
user = credentials['username']
password = credentials['password']
url = 'XXX'
port = XXX
connection = db.create_engine(
f'mysql://{user}:{password}@{url}:{port}/?charset=utf8',
connect_args=ssl_args
)
return connection
@task(log_stdout=True, checkpoint=False)
def extract(credentials: dict) -> any:
ssl_args = {"XXXX"}} ## for local agent testing
query = "QUERY HERE"
df = MySQLFetch(user=['username'],
password=['password'],
host='XXX',
port=XXXX,
ssl=ssl_args,
charset='utf8',
query=query).run()
return df
# This is where we will do any necessary date transformations
@task(log_stdout=True)
def transform(df: pd.DataFrame) -> pd.DataFrame:
# print(dataframe)
return df
@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame) -> pd.DataFrame:
res = dataframe.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
print(res)
# In[29]:
# Flow
flow = Flow("Flow Name Here")
with flow:
df = extract(credentials = PrefectSecret("SECRET"))
sink = transform(df=df)
load(connection=connection, if_exists='append', db_table='XXXXX', dataframe=sink)
Kevin Kho
Jason Motley
11/18/2021, 6:48 PMdb_connection
Kevin Kho
Jason Motley
11/18/2021, 7:02 PMKevin Kho
Kevin Kho
credentials['username']
?Jason Motley
11/18/2021, 7:18 PMJason Motley
11/18/2021, 7:18 PMJason Motley
11/18/2021, 7:18 PMTypeError: connection() missing 1 required positional argument: 'credentials'
Jason Motley
11/18/2021, 7:19 PMKevin Kho
Jason Motley
11/18/2021, 7:22 PM# Flow
flow = Flow("XXX")
with flow:
df = extract(credentials = PrefectSecret("XXXX"))
sink = transform(df=df)
load(connection=connection, credentials = PrefectSecret("XXX"), if_exists='append', db_table='XXXt', dataframe=sink)
Jason Motley
11/18/2021, 7:22 PMJason Motley
11/18/2021, 7:23 PM@task(log_stdout=True, checkpoint=False)
def connection(credentials: dict) -> any:
import sqlalchemy as db
"XXX"}} ## for local agent testing
user = credentials['username']
password = credentials['password']
url = 'XXX'
port = XXXX
connection = db.create_engine(
f'mysql://{user}:{password}@{url}:{port}/?charset=utf8',
connect_args=ssl_args
)
return connection
Kevin Kho
load(connection=connection)
is using the task. I think you need to create the connection then pass it. You are passing a taskKevin Kho
Jason Motley
11/18/2021, 7:28 PMAttributeError: 'tuple' object has no attribute 'to_sql'
Jason Motley
11/18/2021, 7:28 PMKevin Kho
Jason Motley
11/18/2021, 7:31 PMKevin Kho
to_sql
?Jason Motley
11/18/2021, 7:34 PMKevin Kho
List[Tuple]
but not 100% sureKevin Kho
results = MySQLFetch(...).run()
print(results)
print(type(results))
Kevin Kho
Jason Motley
11/18/2021, 7:51 PMKevin Kho
to_sql
methodJason Motley
11/18/2021, 7:59 PM("type object 'DataFrame' has no attribute 'from_items'")
Kevin Kho
Kevin Kho
Jason Motley
11/18/2021, 8:40 PMTypeError("'datetime.date' object is not iterable")
Kevin Kho
Jason Motley
11/18/2021, 8:53 PMKevin Kho
Kevin Kho
Jason Motley
11/18/2021, 9:25 PMJason Motley
11/18/2021, 9:39 PMKevin Kho
Jason Motley
11/18/2021, 10:17 PMKevin Kho
Jason Motley
11/18/2021, 10:21 PMJason Motley
11/18/2021, 10:21 PM@task(log_stdout=True)
def load(connection: any, if_exists: str, db_table: str, dataframe: pd.DataFrame, credentials: dict) -> pd.DataFrame:
res = df.to_sql(name=db_table, con=connection, if_exists=if_exists, chunksize=None, index=False)
## res = dataframe.to_sql(name=db_table, con=connection, credentials = PrefectSecret("XXX"), if_exists=if_exists, chunksize=None, index=False)
print(res)
Jason Motley
11/18/2021, 10:21 PM'FunctionTask' object has no attribute 'to_sql'")
Jason Motley
11/18/2021, 10:22 PMKevin Kho
FunctionTask
is something you wrapped in a @task
decorator. This is happening because df is not something being passed into this function. I think you want to use <http://dataframe.to|dataframe.to>_sql
or change your input to df
Jason Motley
11/18/2021, 10:28 PMKevin Kho
Jason Motley
11/18/2021, 10:29 PM