Andrea Nerla
12/28/2021, 12:53 PMAndrea Nerla
12/28/2021, 12:54 PMimport prefectimport osimport pyodbcfrom prefect import task, Flow@taskdef table_creation():logger = prefect.context.get("logger")conn = pyodbc.connect('Driver={SQL Server};' 'Server=localhost\sqlserver;' 'Database=test;' 'user=prova;' 'password=provaprova;')cursor = conn.cursor()file = open(r"C:\Users\andrea.nerla\Desktop\sidal\sql sidal 2\quote details table creazione.sql", 'r')sql = file.read()file.close()cursor.execute(sql)conn.commit()cursor.close()conn.close()@taskdef extract_load():return os.system(r'"python C:\Users\andrea.nerla\Desktop\sidal\python_per_test\test_quotedetails.py"')@taskdef transform():#    logger = prefect.context.get("logger")#    conn = pyodbc.connect('Driver={SQL Server};' 'Server=localhost\sqlserver;' 'Database=test;' 'user=prova;' 'password=provaprova;')#    cursor = conn.cursor()file = open(r"C:\Users\andrea.nerla\Desktop\sidal\sql sidal 2\quote details stored proc.sql", 'r')sql = file.read()file.close()#    cursor.execute(sql)#    conn.commit()#    conn.close()with Flow("quotedetails_flow") as flow:table_creation()extract_load()transform()flow.register(project_name="quotedetails_test")flow.run()ale
12/28/2021, 1:01 PMtc = table_creation()
el = extract_load()
t = transform()
t.set_upstream(el)
el.set_upstream(tc)ale
12/28/2021, 1:02 PMale
12/28/2021, 1:03 PMAndrea Nerla
12/28/2021, 1:04 PMale
12/28/2021, 1:11 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by