I am having trouble using <prefect.io> as flows ca...
# ask-community
c
I am having trouble using prefect.io as flows can run successfully but nothing really happened. Here are what I mean by that. 1. I registered to cloud successfully with api generated from service account and api. 2. The upload to database happened when I register the flows. 3. But when it is scheduled to run, the upload to the database didn't happen despite the fact that the UI shows success on all the flows. Any pointer out there? Appreciate all your help!
k
Hi @Chohang Ng! Sounds like you’re on Prefect server? What do you mean the upload to the database didn’t happen in number 3?
c
I am doing ETL. It seems like all the tasks didn't get ran at all when they are on schedule or when I manually hit Quick Run on the prefect.io interface. They ran when I register them though.
k
Does the Flow get stuck in a Scheduled state in the UI? Or is ti labelled as a success?
c
It said success. all green
k
This looks like it ran. Did your agent pick it up?
c
yes,
The screenshot was taken soon after I hit quick run
k
This looks like it ran. What makes you say the tasks didnt run? Do you see logs in the Flow in the UI?
c
I checked the output of the ETL.
no output was generated after flows run. For example, one task was just save a dataframe as a csv file. No output was fed out
But the csv file was generated when I register them. I can actually see the output that time
k
How are you registering? Registration should not be running the flow. Do you have
flow.run()
in your python script that contains the Flow?
c
Copy code
import pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
from prefect import task, Flow, Parameter
import prefect

from flow_1 import *
from flow_2 import *

from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor

weekday_schedule = CronSchedule(
    "1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
)


flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)


with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
           run_config=LocalRun()) as flow:
    flow_1_flow.set_upstream(flow_2_flow)
    
flow.register(project_name='tester')
Copy code
import pandas as pd
import numpy as np

import os,sys

from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor

read_conn = db.read_conn


def extract():
    query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p 
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 7962"""
    df = pd.read_sql(query,read_conn)
    return df
    

def load(df):
    df.to_csv("df.csv",index=False)


with Flow('flow_2',executor=LocalDaskExecutor(), 
           run_config=LocalRun()) as flow:
    df = extract()
    load(df)
flow.register(project_name="tester")
The second one is flow_2.py
k
You need to call the StartFlowRun inside the Flow like this:
Copy code
flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
           run_config=LocalRun()) as flow:
    a = flow_1_flow()
    b = flow_2_flow()
    a.set_upstream(b)
This will run them
c
The first screenshot is flow_flow.py. That is the script that I run " python flow_flow.py" to register
let me try
still no. The csv was generated only when i registered
k
You re-registered and ran?
c
yes
Copy code
import pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
from prefect import task, Flow, Parameter
import prefect

from flow_1 import *
from flow_2 import *

from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor

weekday_schedule = CronSchedule(
    "1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
)


flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)


with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(), 
           run_config=LocalRun()) as flow:
    a = flow_1_flow()
    b = flow_2_flow()
    a.set_upstream(b)
    
flow.register(project_name='tester')
k
flow_2.py looks a bit off. Can you make extract() and load() into tasks by adding the
@task
?
c
Yea but when I did. It gave me an error TypeError: cannot pickle 'socket' object. That I wasn't able to figure out when was going on.
flow_1.py didn't throw me that error and have @task included. Maybe that's what I should do?
k
Can you move
read_conn = db.read_conn
into
extract
?
If this is an open connection to the database, you want it to be created inside the task
c
It solved that type error but after I re-registered and reran it. still got no output
Now, the csv output is not being generated when I register flows
k
That’s the correct behavior. The code should not be running when you register the flows. Execution of task is deferred to when the flow actually runs. The csv should be produced when you run the flow, not at registration.
c
Yes. I see. But it still doesn't produce the expected csv though when I run the flow and it was success. Anything might have gone wrong?
k
You registered flow2.py again and re-ran the main-flow?
c
I only re-registered the flow_flow.py. let me re-register everything
k
You need to register the flow2.py where you had the edits to
extract
and
load
so that StartFlowRun runs the updated flow2
c
I just did. Still no csv
Here is the flow_2.py
Copy code
import pandas as pd
import numpy as np

import os,sys

from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor



@task
def extract():
    read_conn = db.read_conn
    query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p 
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 7962"""
    df = pd.read_sql(query,read_conn)
    return df
    
@task
def load(df):
    df.to_csv("df.csv",index=False)


with Flow('flow_2',executor=LocalDaskExecutor(), 
           run_config=LocalRun()) as flow:
    df = extract()
    load(df)
flow.register(project_name="tester")
k
I think this might be because the Flow is being ran in a different path than you’re expecting. Can you save that csv with an absolute path?
c
sure
It works! Thank you! That's awesome!
k
Nice! Glad we got it working! 👍
🙌 1
c
@Kevin Kho Could you take a look at this as well by any chance? It has the same problem when I used the
Task
 subclasses. But the csv file is only generated when I during the register process
Copy code
from prefect import Task
import pandas as pd

import os,sys

from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter

import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor


class ETL(Task):
    def __init__(self):
        self.df  = self.extract()

    def extract(self):
        read_conn = db.read_conn
        query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p 
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
        df = pd.read_sql(query,read_conn)
        return df


    def load(self):
        self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)

with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    df = ETL()
    df.load()

flow.register(project_name="tester")
@Kevin Kho https://docs.prefect.io/core/concepts/tasks.html#overview Based on my understanding if I need to put @task for object, I just need to pass Task as subclass.
k
Answering you in the other thread in server