Is there a good way to read in custom SQL, then de...
# ask-community
j
Is there a good way to read in custom SQL, then delete 14 days of data from the target table, then deposit that custom SQL for the last 22 days into the target table, ensuring no duplicate records? Something like
extract => transform => delete 14 days in target => append last 22 days w/ no dupes
. Right now I"m doing 2 parallel ETLs, one which ends in replacing the existing data and the other which appends. This seems very slow since I have to perform 2 full extracts.
k
It sounds like you should be able to if you structure you flow this way right?
Copy code
with Flow(..) as flow:
    e = extract()
    t1 = transform1(e)
    d = delete(t1)
    t2 = transform2(e)
    u = upsert(e, t2).    # this function will be responsible for the dedup
j
Is there a way to delete the data in the source itself (the database)? Or do you just have to do it in python?
Also do you have a good de-dupe function handy? I'm assuming I could use an index?
k
For deleting, you should be able to using the SQL connector with a query directly. For de-dup… I believe an index will give you an error if there is an existing value. You can either look to UPSERT, or you can bring this to Pandas, query for existing values, and then use the
drop_duplicates
before you insert.
j
Great, thank you!
k
have you seen the SQL tasks in the task library?
j
You'd sent the mySQLFetch which didn't end up working, is there a broader library?
k
For the delete part, there is MySQLExecute
Could you post your latest code and we can see why it didn’t work?
j
Yeah, let me try to rework it and post if it fails again! Is there a way to just pass in your secret to populate the MySQLExecute parameters (username, port, etc.)?
k
Copy code
mysql = MySQLExecute()
with Flow(...) as flow:
    host = Secret("MYHOST")
    mysql(host=host, query=..., ...)
it would look like this
If the
List[List]
structure from this is hard to work with, I suggest you just use
pd.read_sql
also
Copy code
@task
def read_sql(...):
    df = pd.read_sql(...)
    return df
this will directly read from a database and return the DF
j
this worked! are there ways to remove duplicates in the db as a part of the final insert?
i.e. as a part of the load statement
k
Not as straightforward. In general you need like the
INSERT INTO...SELECT...
I think
j
Okay, so this is the code that is not deleting the relevant rows
k
So you select the difference of the datasets and then insert that
j
Copy code
@task(log_stdout=True)
def transform_table():
    from prefect.tasks.mysql import MySQLExecute as MSQE
    host = PrefectSecret("MYSECRET")
    MSQE(db_name='DDDDD', host=host, port=3306, 
    query=f"""delete from DDDDD
     where DateID >= f'{days_back}'""",
     ssl=ssl_args)

flow = Flow("CallbackDataReal")
with flow:
    transform_table() #delete 1
days_back is defined globally just fyi
below the
transform_table()
task in the flow will be the normal extract, transform, and load, into the transformed destination table.
k
PrefectSecret
is a task and should only be used inside the flow block.
MySQLExecute
is already a task so you should not wrap it in a task like this unless you call the
run
method explicitly. So either do
MSQE(...).run()
or put that in the flow. If you want it in the task though, you need to pass in
MYSECRET
to
transform_table
because
PrefectSecret
can’t be used in there.
So the
Task
class has a
run
method and an
___init___
method. The init is not deferred but the run method is deferred. The Flow block is responsible for calling the run method
So in the following syntax:
Copy code
mysql = MySQLExecute()
with Flow(..) as flow:
     mysql()
The one above the Flow is the init. The one inside the flow is the run method (the flow calls it automatically)
You can also do:
Copy code
with Flow(..) as flow:
     MySQLExecute(...)(...)
The first () is the init and the second () is the run
If you use
MySQLExecute
inside the other task like what you are doing, you need to call
.run()
explicitly because it’s outside of the Flow block. You need to do:
Copy code
@task(log_stdout=True)
def transform_table():
    from prefect.tasks.mysql import MySQLExecute as MSQE
    MSQE(db_name='DDDDD', host=host, port=3306, 
    query=f"""delete from DDDDD
     where DateID >= f'{days_back}'""",
     ssl=ssl_args).run()
But the
PrefectSecret
is only meant to be used in the Flow block and the Flow will be responsible of calling it during runtime. There are two interfaces for secrets.
PrefectSecret
for inside the Flow.
Copy code
with Flow(...) as flow:
    host = PrefectSecret("MYHOST")
or if outside the Flow:
Copy code
@task
def abc():
    from prefect.client import Secret
    my_secret_value = Secret("MYSECRET").get()
as seen here
j
Just catching up on this sorry
k
I had a typo earlier, it should be
PrefectSecret
instead of
Secret
like this
Copy code
mysql = MySQLExecute()
with Flow(...) as flow:
    host = PrefectSecret("MYHOST")
    mysql(host=host, query=..., ...)
j
I got this error on commit:
UserWarning: A Task was passed as an argument to MySQLExecute, you likely want to first initialize MySQLExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
Running the setup where I call it entirely in the flow
k
That’s because
PrefectSecret
is a task. Just a special kind of one.
j
ahh
misread it
So should I be replacing PrefectSecret with "Secret" in this part of the flow:
k
So you can only pass
PrefectSecret
into a task in the Flow block or you can use
Secret.get()
instead
j
Could you provide the .get() example in the flow block?
This is what I'm operating on:
Copy code
MSQE(db_name='DB', host=host, port=3306, 
    query=f"""delete from DB.TestTest
     where DateID >= f'{days_back}'""",
     ssl=ssl_args).run()
where
ssl_args
is defined globally and the host = PrefectSecret("MySecret") within the flow.... or am I mis-understanding where to put the PrefectSecret statement?
k
Other way around.
PrefectSecret
absolutely only goes in the Flow block.
j
I'm in the flow block, so...?
k
Otherwise it’s
Secret(…).get()
like this:
Copy code
@task
def abc():
    from prefect.client import Secret
    my_secret_value = Secret("MYSECRET").get()
    other_task(my_secret_value).run()
j
This is what is erroring:
Copy code
with flow:
    from prefect.tasks.mysql import MySQLExecute as MSQE
    host = PrefectSecret("MySecret")
    MSQE(db_name='DB', host=host, port=3306, 
    query=f"""delete from DB.TestTest
     where DateID >= f'{days_back}'""",
     ssl=ssl_args).run()
k
So if you do:
Copy code
with Flow(...) as flow:
    MyTask()()
the first parenthesis in the init and the second is the run. The init is evaluated immediately. The problem is that the PrefectSecret does not have a value during init time, it only gets a value during runtime. The
.run()
is also called by the Flow. So what you want is to pass to the run
()
instead of the init like this:
Copy code
with flow:
    from prefect.tasks.mysql import MySQLExecute as MSQE
    host = PrefectSecret("MySecret")
    MSQE()(db_name='DB', host=host, port=3306, 
    query=f"""delete from DB.TestTest
     where DateID >= f'{days_back}'""",
     ssl=ssl_args)
The run method will be deferred to the flow run execution
j
Got one step closer:
ValueError: A query string must be provide
(there is a query string provided) 🙂
k
Let me check if that f-string is evaluated right one second
j
1 sec
fixed something, testing the run now
It got to prefect UI but gave this error:
Error during execution of task: TypeError('getaddrinfo() argument 1 must be string or None')
k
This is new to me. Is your agent on the same machine as where you developed?
j
Yeah, running a local agent before I push this to production eventually.
k
You get this too with
flow.run()
j
What do you mean?
k
Copy code
with Flow(…) as flow:
   ....

flow.run()
instead of registering it
Oh sorry that was a question two msgs ago
j
I can't quite debug flow.run() without committing since my secret is stored on the server
k
You are on Prefect Cloud right? You can do`export PREFECT___CLOUD____USE__LOCAL_SECRETS=false` to always pull from Prefect Cloud even for local runs
j
where would I put that?
k
Set an environment variable like in a terminal
j
hm, I might not have permissions
k
Ah ok let me dig a big into that error. It’s likely with pymysql
j
Much appreciated. I'll check the thread in the AM, thanks!!
k
So it looks like the issue is related to
aiohttp
when
ipv6
in enabled. Do you know if you have
ipv6
is enabled? this issue kinda shows what is going on. I dunno a lot, but I would try upgrading to
aiohttp 3.8
and above. This looks like it got fixed last month
This is the pymysql code under the hood. It’s pretty simple and there is nothing fancy, so I think this is just a matter on finding a right version for pymysql and dependencies. Pymysql hasn’t released a new version since Jan though
I am out of office. Will be back Monday so I suggest to post a new thread instead of replying