Jason Motley
11/22/2021, 8:41 PMextract => transform => delete 14 days in target => append last 22 days w/ no dupes
. Right now I"m doing 2 parallel ETLs, one which ends in replacing the existing data and the other which appends. This seems very slow since I have to perform 2 full extracts.Kevin Kho
with Flow(..) as flow:
e = extract()
t1 = transform1(e)
d = delete(t1)
t2 = transform2(e)
u = upsert(e, t2). # this function will be responsible for the dedup
Jason Motley
11/22/2021, 8:52 PMJason Motley
11/22/2021, 8:52 PMKevin Kho
drop_duplicates
before you insert.Jason Motley
11/22/2021, 8:59 PMKevin Kho
Jason Motley
11/22/2021, 9:04 PMKevin Kho
Kevin Kho
Jason Motley
11/22/2021, 9:27 PMKevin Kho
mysql = MySQLExecute()
with Flow(...) as flow:
host = Secret("MYHOST")
mysql(host=host, query=..., ...)
it would look like thisKevin Kho
List[List]
structure from this is hard to work with, I suggest you just use pd.read_sql
also
@task
def read_sql(...):
df = pd.read_sql(...)
return df
this will directly read from a database and return the DFJason Motley
11/22/2021, 9:53 PMJason Motley
11/22/2021, 9:53 PMKevin Kho
INSERT INTO...SELECT...
I thinkJason Motley
11/22/2021, 10:37 PMKevin Kho
Jason Motley
11/22/2021, 10:40 PM@task(log_stdout=True)
def transform_table():
from prefect.tasks.mysql import MySQLExecute as MSQE
host = PrefectSecret("MYSECRET")
MSQE(db_name='DDDDD', host=host, port=3306,
query=f"""delete from DDDDD
where DateID >= f'{days_back}'""",
ssl=ssl_args)
flow = Flow("CallbackDataReal")
with flow:
transform_table() #delete 1
Jason Motley
11/22/2021, 10:41 PMJason Motley
11/22/2021, 10:42 PMtransform_table()
task in the flow will be the normal extract, transform, and load, into the transformed destination table.Kevin Kho
PrefectSecret
is a task and should only be used inside the flow block. MySQLExecute
is already a task so you should not wrap it in a task like this unless you call the run
method explicitly.
So either do MSQE(...).run()
or put that in the flow. If you want it in the task though, you need to pass in MYSECRET
to transform_table
because PrefectSecret
can’t be used in there.Kevin Kho
Task
class has a run
method and an ___init___
method. The init is not deferred but the run method is deferred. The Flow block is responsible for calling the run methodKevin Kho
mysql = MySQLExecute()
with Flow(..) as flow:
mysql()
The one above the Flow is the init. The one inside the flow is the run method (the flow calls it automatically)Kevin Kho
with Flow(..) as flow:
MySQLExecute(...)(...)
The first () is the init and the second () is the runKevin Kho
MySQLExecute
inside the other task like what you are doing, you need to call .run()
explicitly because it’s outside of the Flow block. You need to do:
@task(log_stdout=True)
def transform_table():
from prefect.tasks.mysql import MySQLExecute as MSQE
MSQE(db_name='DDDDD', host=host, port=3306,
query=f"""delete from DDDDD
where DateID >= f'{days_back}'""",
ssl=ssl_args).run()
Kevin Kho
PrefectSecret
is only meant to be used in the Flow block and the Flow will be responsible of calling it during runtime. There are two interfaces for secrets. PrefectSecret
for inside the Flow.
with Flow(...) as flow:
host = PrefectSecret("MYHOST")
or if outside the Flow:
@task
def abc():
from prefect.client import Secret
my_secret_value = Secret("MYSECRET").get()
as seen hereJason Motley
11/22/2021, 10:58 PMKevin Kho
PrefectSecret
instead of Secret
like this
mysql = MySQLExecute()
with Flow(...) as flow:
host = PrefectSecret("MYHOST")
mysql(host=host, query=..., ...)
Jason Motley
11/22/2021, 11:00 PMUserWarning: A Task was passed as an argument to MySQLExecute, you likely want to first initialize MySQLExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
Jason Motley
11/22/2021, 11:01 PMKevin Kho
PrefectSecret
is a task. Just a special kind of one.Jason Motley
11/22/2021, 11:02 PMJason Motley
11/22/2021, 11:02 PMJason Motley
11/22/2021, 11:03 PMKevin Kho
PrefectSecret
into a task in the Flow block or you can use Secret.get()
insteadJason Motley
11/22/2021, 11:03 PMJason Motley
11/22/2021, 11:04 PMJason Motley
11/22/2021, 11:04 PMMSQE(db_name='DB', host=host, port=3306,
query=f"""delete from DB.TestTest
where DateID >= f'{days_back}'""",
ssl=ssl_args).run()
Jason Motley
11/22/2021, 11:04 PMssl_args
is defined globally and the host = PrefectSecret("MySecret") within the flow.... or am I mis-understanding where to put the PrefectSecret statement?Kevin Kho
PrefectSecret
absolutely only goes in the Flow block.Jason Motley
11/22/2021, 11:06 PMKevin Kho
Secret(…).get()
like this:
@task
def abc():
from prefect.client import Secret
my_secret_value = Secret("MYSECRET").get()
other_task(my_secret_value).run()
Jason Motley
11/22/2021, 11:08 PMwith flow:
from prefect.tasks.mysql import MySQLExecute as MSQE
host = PrefectSecret("MySecret")
MSQE(db_name='DB', host=host, port=3306,
query=f"""delete from DB.TestTest
where DateID >= f'{days_back}'""",
ssl=ssl_args).run()
Kevin Kho
with Flow(...) as flow:
MyTask()()
the first parenthesis in the init and the second is the run. The init is evaluated immediately. The problem is that the PrefectSecret does not have a value during init time, it only gets a value during runtime. The .run()
is also called by the Flow. So what you want is to pass to the run ()
instead of the init like this:
with flow:
from prefect.tasks.mysql import MySQLExecute as MSQE
host = PrefectSecret("MySecret")
MSQE()(db_name='DB', host=host, port=3306,
query=f"""delete from DB.TestTest
where DateID >= f'{days_back}'""",
ssl=ssl_args)
Kevin Kho
Jason Motley
11/22/2021, 11:15 PMValueError: A query string must be provide
Jason Motley
11/22/2021, 11:17 PMKevin Kho
Jason Motley
11/22/2021, 11:17 PMJason Motley
11/22/2021, 11:18 PMJason Motley
11/22/2021, 11:19 PMError during execution of task: TypeError('getaddrinfo() argument 1 must be string or None')
Kevin Kho
Jason Motley
11/22/2021, 11:21 PMKevin Kho
flow.run()
Jason Motley
11/22/2021, 11:24 PMKevin Kho
with Flow(…) as flow:
....
flow.run()
instead of registering itKevin Kho
Jason Motley
11/22/2021, 11:28 PMKevin Kho
Jason Motley
11/22/2021, 11:31 PMKevin Kho
Jason Motley
11/22/2021, 11:32 PMKevin Kho
Jason Motley
11/22/2021, 11:34 PMKevin Kho
aiohttp
when ipv6
in enabled. Do you know if you have ipv6
is enabled? this issue kinda shows what is going on. I dunno a lot, but I would try upgrading to aiohttp 3.8
and above. This looks like it got fixed last monthKevin Kho
Kevin Kho