Andrea Nerla
12/28/2021, 12:53 PMAndrea Nerla
12/28/2021, 12:54 PMimport prefect
import os
import pyodbc
from prefect import task, Flow
@task
def table_creation():
logger = prefect.context.get("logger")
conn = pyodbc.connect('Driver={SQL Server};' 'Server=localhost\sqlserver;' 'Database=test;' 'user=prova;' 'password=provaprova;')
cursor = conn.cursor()
file = open(r"C:\Users\andrea.nerla\Desktop\sidal\sql sidal 2\quote details table creazione.sql", 'r')
sql = file.read()
file.close()
cursor.execute(sql)
conn.commit()
cursor.close()
conn.close()
@task
def extract_load():
return os.system(r'"python C:\Users\andrea.nerla\Desktop\sidal\python_per_test\test_quotedetails.py"')
@task
def transform():
# logger = prefect.context.get("logger")
# conn = pyodbc.connect('Driver={SQL Server};' 'Server=localhost\sqlserver;' 'Database=test;' 'user=prova;' 'password=provaprova;')
# cursor = conn.cursor()
file = open(r"C:\Users\andrea.nerla\Desktop\sidal\sql sidal 2\quote details stored proc.sql", 'r')
sql = file.read()
file.close()
# cursor.execute(sql)
# conn.commit()
# conn.close()
with Flow("quotedetails_flow") as flow:
table_creation()
extract_load()
transform()
flow.register(project_name="quotedetails_test")
flow.run()
ale
12/28/2021, 1:01 PMtc = table_creation()
el = extract_load()
t = transform()
t.set_upstream(el)
el.set_upstream(tc)
ale
12/28/2021, 1:02 PMale
12/28/2021, 1:03 PMAndrea Nerla
12/28/2021, 1:04 PMale
12/28/2021, 1:11 PM