c

    Chohang Ng

    1 year ago
    I am having trouble using prefect.io as flows can run successfully but nothing really happened. Here are what I mean by that. 1. I registered to cloud successfully with api generated from service account and api. 2. The upload to database happened when I register the flows. 3. But when it is scheduled to run, the upload to the database didn't happen despite the fact that the UI shows success on all the flows. Any pointer out there? Appreciate all your help!
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Chohang Ng! Sounds like you’re on Prefect server? What do you mean the upload to the database didn’t happen in number 3?
    c

    Chohang Ng

    1 year ago
    I am doing ETL. It seems like all the tasks didn't get ran at all when they are on schedule or when I manually hit Quick Run on the prefect.io interface. They ran when I register them though.
    Kevin Kho

    Kevin Kho

    1 year ago
    Does the Flow get stuck in a Scheduled state in the UI? Or is ti labelled as a success?
    c

    Chohang Ng

    1 year ago
    It said success. all green
    Kevin Kho

    Kevin Kho

    1 year ago
    This looks like it ran. Did your agent pick it up?
    c

    Chohang Ng

    1 year ago
    yes,
    The screenshot was taken soon after I hit quick run
    Kevin Kho

    Kevin Kho

    1 year ago
    This looks like it ran. What makes you say the tasks didnt run? Do you see logs in the Flow in the UI?
    c

    Chohang Ng

    1 year ago
    I checked the output of the ETL.
    no output was generated after flows run. For example, one task was just save a dataframe as a csv file. No output was fed out
    But the csv file was generated when I register them. I can actually see the output that time
    Kevin Kho

    Kevin Kho

    1 year ago
    How are you registering? Registration should not be running the flow. Do you have
    flow.run()
    in your python script that contains the Flow?
    c

    Chohang Ng

    1 year ago
    import pendulum ## for crons timestamp
    from prefect import Flow
    from prefect.schedules import CronSchedule
    from prefect.tasks.prefect import StartFlowRun ## flow of flows
    from prefect import task, Flow, Parameter
    import prefect
    
    from flow_1 import *
    from flow_2 import *
    
    from prefect.agent.local import LocalAgent
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    weekday_schedule = CronSchedule(
        "1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
    )
    
    
    flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
    flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
    
    
    with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
               run_config=LocalRun()) as flow:
        flow_1_flow.set_upstream(flow_2_flow)
        
    flow.register(project_name='tester')
    import pandas as pd
    import numpy as np
    
    import os,sys
    
    from prefect.utilities.tasks import task
    import db as db
    from prefect import task, Flow, Parameter
    import prefect
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    read_conn = db.read_conn
    
    
    def extract():
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
    from hq.oproducts p 
    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
    WHERE b.oproduct_id = 7962"""
        df = pd.read_sql(query,read_conn)
        return df
        
    
    def load(df):
        df.to_csv("df.csv",index=False)
    
    
    with Flow('flow_2',executor=LocalDaskExecutor(), 
               run_config=LocalRun()) as flow:
        df = extract()
        load(df)
    flow.register(project_name="tester")
    The second one is flow_2.py
    Kevin Kho

    Kevin Kho

    1 year ago
    You need to call the StartFlowRun inside the Flow like this:
    flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
    flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
    with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
               run_config=LocalRun()) as flow:
        a = flow_1_flow()
        b = flow_2_flow()
        a.set_upstream(b)
    This will run them
    c

    Chohang Ng

    1 year ago
    The first screenshot is flow_flow.py. That is the script that I run " python flow_flow.py" to register
    let me try
    still no. The csv was generated only when i registered
    Kevin Kho

    Kevin Kho

    1 year ago
    You re-registered and ran?
    c

    Chohang Ng

    1 year ago
    yes
    import pendulum ## for crons timestamp
    from prefect import Flow
    from prefect.schedules import CronSchedule
    from prefect.tasks.prefect import StartFlowRun ## flow of flows
    from prefect import task, Flow, Parameter
    import prefect
    
    from flow_1 import *
    from flow_2 import *
    
    from prefect.agent.local import LocalAgent
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    weekday_schedule = CronSchedule(
        "1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
    )
    
    
    flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
    flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
    
    
    with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
               run_config=LocalRun()) as flow:
        a = flow_1_flow()
        b = flow_2_flow()
        a.set_upstream(b)
        
    flow.register(project_name='tester')
    Kevin Kho

    Kevin Kho

    1 year ago
    flow_2.py looks a bit off. Can you make extract() and load() into tasks by adding the
    @task
    ?
    c

    Chohang Ng

    1 year ago
    Yea but when I did. It gave me an error TypeError: cannot pickle 'socket' object. That I wasn't able to figure out when was going on.
    flow_1.py didn't throw me that error and have @task included. Maybe that's what I should do?
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you move
    read_conn = db.read_conn
    into
    extract
    ?
    If this is an open connection to the database, you want it to be created inside the task
    c

    Chohang Ng

    1 year ago
    It solved that type error but after I re-registered and reran it. still got no output
    Now, the csv output is not being generated when I register flows
    Kevin Kho

    Kevin Kho

    1 year ago
    That’s the correct behavior. The code should not be running when you register the flows. Execution of task is deferred to when the flow actually runs. The csv should be produced when you run the flow, not at registration.
    c

    Chohang Ng

    1 year ago
    Yes. I see. But it still doesn't produce the expected csv though when I run the flow and it was success. Anything might have gone wrong?
    Kevin Kho

    Kevin Kho

    1 year ago
    You registered flow2.py again and re-ran the main-flow?
    c

    Chohang Ng

    1 year ago
    I only re-registered the flow_flow.py. let me re-register everything
    Kevin Kho

    Kevin Kho

    1 year ago
    You need to register the flow2.py where you had the edits to
    extract
    and
    load
    so that StartFlowRun runs the updated flow2
    c

    Chohang Ng

    1 year ago
    I just did. Still no csv
    Here is the flow_2.py
    import pandas as pd
    import numpy as np
    
    import os,sys
    
    from prefect.utilities.tasks import task
    import db as db
    from prefect import task, Flow, Parameter
    import prefect
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    
    
    @task
    def extract():
        read_conn = db.read_conn
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
    from hq.oproducts p 
    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
    WHERE b.oproduct_id = 7962"""
        df = pd.read_sql(query,read_conn)
        return df
        
    @task
    def load(df):
        df.to_csv("df.csv",index=False)
    
    
    with Flow('flow_2',executor=LocalDaskExecutor(), 
               run_config=LocalRun()) as flow:
        df = extract()
        load(df)
    flow.register(project_name="tester")
    Kevin Kho

    Kevin Kho

    1 year ago
    I think this might be because the Flow is being ran in a different path than you’re expecting. Can you save that csv with an absolute path?
    c

    Chohang Ng

    1 year ago
    sure
    It works! Thank you! That's awesome!
    Kevin Kho

    Kevin Kho

    1 year ago
    Nice! Glad we got it working! 👍
    c

    Chohang Ng

    1 year ago
    @Kevin Kho Could you take a look at this as well by any chance? It has the same problem when I used the
    Task
     subclasses. But the csv file is only generated when I during the register process
    from prefect import Task
    import pandas as pd
    
    import os,sys
    
    from prefect.utilities.tasks import task
    import db as db
    from prefect import task, Flow, Parameter
    
    import prefect
    from prefect.run_configs import LocalRun
    from prefect.executors import LocalDaskExecutor
    
    
    class ETL(Task):
        def __init__(self):
            self.df  = self.extract()
    
        def extract(self):
            read_conn = db.read_conn
            query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
    from hq.oproducts p 
    JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
    WHERE b.oproduct_id = 5801"""
            df = pd.read_sql(query,read_conn)
            return df
    
    
        def load(self):
            self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
    
    with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
        df = ETL()
        df.load()
    
    flow.register(project_name="tester")
    @Kevin Kho https://docs.prefect.io/core/concepts/tasks.html#overview Based on my understanding if I need to put @task for object, I just need to pass Task as subclass.
    Kevin Kho

    Kevin Kho

    1 year ago
    Answering you in the other thread in server