https://prefect.io logo
Title
c

Chohang Ng

05/27/2021, 10:15 PM
from prefect import Task
import pandas as pd
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
class ETL(Task):
    def __init__(self):
        self.df  = self.extract()
    def extract(self):
        read_conn = db.read_conn
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p 
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
        df = pd.read_sql(query,read_conn)
        return df
    def load(self):
        self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    df = ETL()
    df.load()
flow.register(project_name="tester")
k

Kevin Kho

05/27/2021, 10:41 PM
In the
ETL(Task)
, you need to change the
extract
to
run
. The
Flow
calls the
run
method of a task. So when you use
ETL()
, it is looking for
run()
, but it doesn’t exist because you have it as
extract()
I don’t think you need the init also. Just let the Flow call the new
run
method. And then last in order to use it in the Flow,
ETL()
is just calling the init. You want to change
ETL()
to
ETL()()
, and the second
()
will call the run method.
I think this should work:
class Extract(Task):

    def run(self):
        read_conn = db.read_conn
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
        from hq.oproducts p 
        JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
        WHERE b.oproduct_id = 5801"""
        df = pd.read_sql(query,read_conn)
        return df

class Load(Task):
    def __init__(self, df):
        self.df = df
        super().__init__()

    def run(self):
        self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)

with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    df = Extract()()
    Load(df)()

flow.register(project_name="tester")
c

Chohang Ng

05/28/2021, 3:01 PM
Unexpected error: AttributeError("'Extract' object has no attribute 'to_csv'")
the load class failed
k

Kevin Kho

05/28/2021, 3:05 PM
Ok let me think
Let me do a quick test on my end and get back to you
Change to this:
class Load(Task):

    def run(self, df):
        df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
        
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    df = Extract()()
    Load()(df)
This will move the df to the runtime by passing it to the
run
method.
c

Chohang Ng

05/28/2021, 3:25 PM
It works!
my question is, do I have to do it this way that limits to using only the run method in a class?
k

Kevin Kho

05/28/2021, 3:30 PM
So the Task has a
run
method that is called inside the flow. If you want everything in one
Task
, you can have
def extract()
and
def load()
, and then you can have a
def run()
that calls both of them.
c

Chohang Ng

05/28/2021, 3:30 PM
If I need to do more extra steps - multiple transform before the load, I'd need to put them in seperated classes?
oh
k

Kevin Kho

05/28/2021, 3:31 PM
But we normally split these out as multiple tasks because they are doing different things, but that’s up to you for sure
This might work for you:
class ETL(Task):

    def extract(self):
        read_conn = db.read_conn
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
                from hq.oproducts p 
                JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
                WHERE b.oproduct_id = 5801"""
        df = pd.read_sql(query,read_conn)
        return df

    def load(self):
        self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)

    def run(self):
        df = self.extract()
        self.load()

with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    ETL()()
c

Chohang Ng

05/28/2021, 3:33 PM
I see the logic. So the run method is the only thing that can be called inside a flow.
k

Kevin Kho

05/28/2021, 3:35 PM
Yes exactly. The Flow is calling the run method.
ETL()()
. The first one is the
init
, the second is the
run