Hello everyone I am a beginner with Prefect, in or...
# prefect-community
r
Hello everyone I am a beginner with Prefect, in order to create a Data Pipeline, I want to connect it with a SQL DB that I have locally so I used the documentation: https://docs.prefect.io/api/latest /tasks/sql_server.html#sqlserverexecute My questions are: -In order to configure the connection between SQL and Prefect I use the classes and I put it all in a .py file and I do execution?? -If Yes, at the .py code level, do a pyodbc import? - If not, how can I do? -for the args data & **kwargs what should be put? -For the password of the user what should be added to indicate it pwd or other arg?
discourse 2
a
This discussion seems to solve a similar problem and includes a sample code you could use. LMK if you have any questions after reading this
I'm moving your code block to this thread just to keep the main channel cleaner:
Copy code
import prefect
from sqlserver import SqlServerExecute

class prefect.tasks.sql_server.sql_server.SqlServerExecute(db_name="OTI_Djoliba", user="sa", host="EQ-EQ6288793\SQLEXPRESS", port=1433, driver="ODBC Driver 17 for SQL Server", query= None, data=None, commit=False, **kwargs)
if you just need to execute a simple query e.g. to insert some row, you could do:
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.sql_server import SqlServerExecute
from prefect.tasks.secrets import PrefectSecret


execute_query = SqlServerExecute(
    db_name="xxx", user="yyy", host="zzz", port=42, commit=True
)


@task
def define_query_from_param(table_name: str):
    return f"""
    INSERT INTO {table_name} ("index","id","first_name","last_name","email","gender","city","country")
    VALUES
    (1000,1001,'Anna','Geller', '<mailto:anna@example.com|anna@example.com>','Female','Berlin','Germany');
    """


with Flow("postgres_example") as flow:
    postgres_pwd = PrefectSecret("SQL_SERVER_DB_PASSWORD")
    table_name = Parameter("table_name", default="stage.customers")
    query = define_query_from_param(table_name)
    execute_query(password=postgres_pwd, query=query)

if __name__ == "__main__":
    flow.run()
r
Hi @Anna Geller Thank you, first I want to use a simple query to make a select to test the connection, and then I'll go to do an ETL. So I'll see what you shared with me, thanks 🙂
👍 2
Hi @Anna Geller in case I have a "sa" user with a password in default="stage.customers" what should I put?
a
this user name and password are for the database - it's not the same as what you use to login to MSSQL Server Management Studio. This is something that you would usually get from your DBA 🙂
r
yes, that's exactly what I did but for the default="stage.customers" parameter, what should I put!
a
this was just an example, you can ignore it
👍 1
Copy code
from prefect import Flow, task
from prefect.tasks.sql_server import SqlServerExecute
from prefect.tasks.secrets import PrefectSecret


execute_query = SqlServerExecute(
    db_name="xxx", user="yyy", host="zzz", port=42, commit=True
)


@task
def define_query():
    return """
    INSERT INTO xxx ("index","id","first_name","last_name","email","gender","city","country")
    VALUES
    (1000,1001,'Anna','Geller', '<mailto:anna@example.com|anna@example.com>','Female','Berlin','Germany');
    """


with Flow("postgres_example") as flow:
    postgres_pwd = PrefectSecret("SQL_SERVER_DB_PASSWORD")
    query = define_query()
    execute_query(password=postgres_pwd, query=query)

if __name__ == "__main__":
    flow.run()
Is it clearer now how this task works? 🙂
👍 1
r
Yes, Thank you
👍 1
Please who has an idea on the cause of this error, I tried to change the user and the server but always remains the same problem, is it related to a class of Prefect?
a
are you on Prefect Cloud? Did you set your Secret in the UI? if so, run this before running your flow:
Copy code
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
r
no I am not on Prefect Cloud
a
great, in that case setting the env variable as shown above and adding the Secret from the UI should fix the issue
r
Please who has an idea on the solution of this problem, i tried to use server= @,port instead of host=@ and port=1433, but i still have same problem: SqlServerExecute Error during execution of task: OperationalError('HYT00', '[HYT00] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0) (SQLDriverConnect)')
a
does it work just from Python code? I don't think this error is related to Prefect