Chohang Ng
05/27/2021, 10:15 PMfrom prefect import Task
import pandas as pd
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
class ETL(Task):
def __init__(self):
self.df = self.extract()
def extract(self):
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
df = pd.read_sql(query,read_conn)
return df
def load(self):
self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
df = ETL()
df.load()
flow.register(project_name="tester")
Kevin Kho
ETL(Task)
, you need to change the extract
to run
. The Flow
calls the run
method of a task. So when you use ETL()
, it is looking for run()
, but it doesn’t exist because you have it as extract()
run
method. And then last in order to use it in the Flow, ETL()
is just calling the init. You want to change ETL()
to ETL()()
, and the second ()
will call the run method.class Extract(Task):
def run(self):
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
df = pd.read_sql(query,read_conn)
return df
class Load(Task):
def __init__(self, df):
self.df = df
super().__init__()
def run(self):
self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
df = Extract()()
Load(df)()
flow.register(project_name="tester")
Chohang Ng
05/28/2021, 3:01 PMKevin Kho
class Load(Task):
def run(self, df):
df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
df = Extract()()
Load()(df)
run
method.Chohang Ng
05/28/2021, 3:25 PMKevin Kho
run
method that is called inside the flow. If you want everything in one Task
, you can have def extract()
and def load()
, and then you can have a def run()
that calls both of them.Chohang Ng
05/28/2021, 3:30 PMKevin Kho
class ETL(Task):
def extract(self):
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
df = pd.read_sql(query,read_conn)
return df
def load(self):
self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
def run(self):
df = self.extract()
self.load()
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
ETL()()
Chohang Ng
05/28/2021, 3:33 PMKevin Kho
ETL()()
. The first one is the init
, the second is the run