Chohang Ng
05/27/2021, 4:55 PMKevin Kho
Chohang Ng
05/27/2021, 5:29 PMKevin Kho
Chohang Ng
05/27/2021, 5:31 PMKevin Kho
Chohang Ng
05/27/2021, 5:33 PMChohang Ng
05/27/2021, 5:33 PMKevin Kho
Chohang Ng
05/27/2021, 5:34 PMChohang Ng
05/27/2021, 5:35 PMChohang Ng
05/27/2021, 5:35 PMKevin Kho
flow.run()
in your python script that contains the Flow?Chohang Ng
05/27/2021, 5:36 PMimport pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
from prefect import task, Flow, Parameter
import prefect
from flow_1 import *
from flow_2 import *
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
weekday_schedule = CronSchedule(
"1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
)
flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(),
run_config=LocalRun()) as flow:
flow_1_flow.set_upstream(flow_2_flow)
flow.register(project_name='tester')
Chohang Ng
05/27/2021, 5:37 PMimport pandas as pd
import numpy as np
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
read_conn = db.read_conn
def extract():
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 7962"""
df = pd.read_sql(query,read_conn)
return df
def load(df):
df.to_csv("df.csv",index=False)
with Flow('flow_2',executor=LocalDaskExecutor(),
run_config=LocalRun()) as flow:
df = extract()
load(df)
flow.register(project_name="tester")
Chohang Ng
05/27/2021, 5:38 PMKevin Kho
flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(),
run_config=LocalRun()) as flow:
a = flow_1_flow()
b = flow_2_flow()
a.set_upstream(b)
Kevin Kho
Chohang Ng
05/27/2021, 5:39 PMChohang Ng
05/27/2021, 5:39 PMChohang Ng
05/27/2021, 5:42 PMKevin Kho
Chohang Ng
05/27/2021, 5:43 PMChohang Ng
05/27/2021, 5:43 PMimport pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
from prefect import task, Flow, Parameter
import prefect
from flow_1 import *
from flow_2 import *
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
weekday_schedule = CronSchedule(
"1 1 1 * *", start_date=pendulum.now(tz="US/Mountain")
)
flow_1_flow = StartFlowRun(flow_name='flow_1',project_name='tester',wait = True)
flow_2_flow = StartFlowRun(flow_name='flow_2',project_name='tester',wait = True)
with Flow("main-flow", schedule=weekday_schedule,executor=LocalDaskExecutor(),
run_config=LocalRun()) as flow:
a = flow_1_flow()
b = flow_2_flow()
a.set_upstream(b)
flow.register(project_name='tester')
Kevin Kho
@task
?Chohang Ng
05/27/2021, 5:45 PMChohang Ng
05/27/2021, 5:46 PMKevin Kho
read_conn = db.read_conn
into extract
?Kevin Kho
Chohang Ng
05/27/2021, 5:48 PMChohang Ng
05/27/2021, 5:49 PMKevin Kho
Chohang Ng
05/27/2021, 5:52 PMKevin Kho
Chohang Ng
05/27/2021, 5:55 PMKevin Kho
extract
and load
so that StartFlowRun runs the updated flow2Chohang Ng
05/27/2021, 5:56 PMChohang Ng
05/27/2021, 5:56 PMChohang Ng
05/27/2021, 5:56 PMimport pandas as pd
import numpy as np
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
@task
def extract():
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 7962"""
df = pd.read_sql(query,read_conn)
return df
@task
def load(df):
df.to_csv("df.csv",index=False)
with Flow('flow_2',executor=LocalDaskExecutor(),
run_config=LocalRun()) as flow:
df = extract()
load(df)
flow.register(project_name="tester")
Kevin Kho
Chohang Ng
05/27/2021, 5:57 PMChohang Ng
05/27/2021, 5:58 PMKevin Kho
Chohang Ng
05/27/2021, 9:50 PMTask
subclasses. But the csv file is only generated when I during the register processChohang Ng
05/27/2021, 9:50 PMfrom prefect import Task
import pandas as pd
import os,sys
from prefect.utilities.tasks import task
import db as db
from prefect import task, Flow, Parameter
import prefect
from prefect.run_configs import LocalRun
from prefect.executors import LocalDaskExecutor
class ETL(Task):
def __init__(self):
self.df = self.extract()
def extract(self):
read_conn = db.read_conn
query ="""SELECT b.oproduct_id, p.oproduct_id,p.oproduct_parent_id,b.obundle_parent_id
from hq.oproducts p
JOIN hq.obundles b ON b.oproduct_id = p.oproduct_id
WHERE b.oproduct_id = 5801"""
df = pd.read_sql(query,read_conn)
return df
def load(self):
self.df.to_csv(r"C:\Users\<http://cho.ng|cho.ng>\test\df.csv",index=False)
with Flow('flow_3',executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
df = ETL()
df.load()
flow.register(project_name="tester")
Chohang Ng
05/27/2021, 9:52 PMKevin Kho