c

    Chohang Ng

    1 year ago
    from prefect import Task
    import pandas as pd
    import os,sys
    from prefect.utilities.tasks import task
    import db as db
    from prefect import task, Flow, Parameter
    import prefect
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    class ETL(Task):
        def __init__(self):
            self.df  = self.extract()
        def extract(self):
            read_conn = db.read_conn
            query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
    from hq.oproducts p 
    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
    WHERE b.oproduct_id = 5801"""
            df = pd.read_sql(query,read_conn)
            return df
        def load(self):
            self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        df = ETL()
        df.load()
    flow.register(project_name="tester")
    Kevin Kho

    Kevin Kho

    1 year ago
    In the
    ETL(Task)
    , you need to change the
    extract
    to
    run
    . The
    Flow
    calls the
    run
    method of a task. So when you use
    ETL()
    , it is looking for
    run()
    , but it doesn’t exist because you have it as
    extract()
    I don’t think you need the init also. Just let the Flow call the new
    run
    method. And then last in order to use it in the Flow,
    ETL()
    is just calling the init. You want to change
    ETL()
    to
    ETL()()
    , and the second
    ()
    will call the run method.
    I think this should work:
    class Extract(Task):
    
        def run(self):
            read_conn = db.read_conn
            query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
            from hq.oproducts p 
            JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
            WHERE b.oproduct_id = 5801"""
            df = pd.read_sql(query,read_conn)
            return df
    
    class Load(Task):
        def __init__(self, df):
            self.df = df
            super().__init__()
    
        def run(self):
            self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
    
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        df = Extract()()
        Load(df)()
    
    flow.register(project_name="tester")
    c

    Chohang Ng

    1 year ago
    Unexpected error: AttributeError("'Extract' object has no attribute 'to_csv'")
    the load class failed
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok let me think
    Let me do a quick test on my end and get back to you
    Change to this:
    class Load(Task):
    
        def run(self, df):
            df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
            
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        df = Extract()()
        Load()(df)
    This will move the df to the runtime by passing it to the
    run
    method.
    c

    Chohang Ng

    1 year ago
    It works!
    my question is, do I have to do it this way that limits to using only the run method in a class?
    Kevin Kho

    Kevin Kho

    1 year ago
    So the Task has a
    run
    method that is called inside the flow. If you want everything in one
    Task
    , you can have
    def extract()
    and
    def load()
    , and then you can have a
    def run()
    that calls both of them.
    c

    Chohang Ng

    1 year ago
    If I need to do more extra steps - multiple transform before the load, I'd need to put them in seperated classes?
    oh
    Kevin Kho

    Kevin Kho

    1 year ago
    But we normally split these out as multiple tasks because they are doing different things, but that’s up to you for sure
    This might work for you:
    class ETL(Task):
    
        def extract(self):
            read_conn = db.read_conn
            query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
                    from hq.oproducts p 
                    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
                    WHERE b.oproduct_id = 5801"""
            df = pd.read_sql(query,read_conn)
            return df
    
        def load(self):
            self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
    
        def run(self):
            df = self.extract()
            self.load()
    
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        ETL()()
    c

    Chohang Ng

    1 year ago
    I see the logic. So the run method is the only thing that can be called inside a flow.
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes exactly. The Flow is calling the run method.
    ETL()()
    . The first one is the
    init
    , the second is the
    run