Chohang Ng
06/03/2021, 10:07 PMnicholas
06/03/2021, 10:09 PMChohang Ng
06/03/2021, 10:10 PMwith Flow("parent-flow", schedule=weekday_schedule,
executor=LocalExecutor(),
run_config=LocalRun()) as flow:
ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
ads_step_2 = aadvfeesbyorderitem_flow_c(upstream_tasks = [ads_step_1, aadvfeesbychannel_flow_e(),aDTC_ads_flow_f()])
ads_step_3 = AdvertisementFee_flow_d(upstream_tasks = [ads_step_2])
upload_all_fee = all_fee_upload_flow(upstream_tasks = [invoice_adjust_flow(), channelfees_flow(),ShippingFees_flow(),ShippingMaterialFees_flow(),StorageFees_flow(),laborfee_flow(),cogs_flow(),customer_fee_flow(),commission_rebate_flow(),transfer_fee_flow(),ads_step_3,promo_flow()])
upload_revenue = revenue_upload_flow(upstream_tasks = [upload_all_fee])
abi_profitability = abi_profit_flow(upstream_tasks = [upload_revenue])
delete_data = delete_fee_flow(upstream_tasks= [abi_profitability])
flow.register(project_name=project_name)
nicholas
06/03/2021, 10:15 PMChohang Ng
06/03/2021, 10:15 PMclass Create_tmp(Task):
def run(self,script,datestr):
nicholas
06/03/2021, 10:17 PMtask
that Prefect exports?Chohang Ng
06/03/2021, 10:19 PMfrom task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
nicholas
06/03/2021, 10:20 PMChohang Ng
06/03/2021, 10:22 PMnicholas
06/03/2021, 10:23 PMChohang Ng
06/03/2021, 10:23 PMwith Flow("1_oAdvertisements_flow", executor=LocalExecutor(),
run_config=LocalRun()) as advertisement_fee:
df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = project_name)
nicholas
06/03/2021, 10:24 PMChohang Ng
06/03/2021, 10:24 PMnicholas
06/03/2021, 10:24 PMnohup prefect agent start -p <PATH OF THE DIRECTORY WHERE THE FLOWS ARE STORED> -l dev
Chohang Ng
06/03/2021, 10:28 PMnicholas
06/03/2021, 10:28 PMChohang Ng
06/03/2021, 10:28 PMnicholas
06/03/2021, 10:30 PMChohang Ng
06/03/2021, 10:31 PMwith Flow("1_oAdvertisements_flow", executor=LocalExecutor(),
run_config=LocalRun()) as advertisement_fee:
df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',datestr)
Load()(df,'oAdvertisements',upstream_tasks= [df])
nicholas
06/03/2021, 10:32 PMChohang Ng
06/03/2021, 10:32 PMnicholas
06/03/2021, 10:34 PMChohang Ng
06/03/2021, 10:34 PMadvertisement_fee_a = StartFlowRun(flow_name='1_oAdvertisements_flow',project_name=project_name,wait = True)
with Flow("parent-flow", schedule=weekday_schedule,
executor=LocalExecutor(),
run_config=LocalRun()) as flow:
ads_step_1 = oAdvertisementProducts_flow_b(upstream_tasks = [advertisement_fee_a()])
nicholas
06/03/2021, 10:35 PMChohang Ng
06/03/2021, 10:36 PMnicholas
06/03/2021, 10:40 PMStartFlowRun
task, you're explicitly creating a run of a different flow, which executes completely on its own and reports its state back to the parent flowChohang Ng
06/03/2021, 10:41 PMnicholas
06/03/2021, 10:44 PMChohang Ng
06/03/2021, 10:46 PMnicholas
06/03/2021, 10:50 PMChohang Ng
06/03/2021, 10:51 PMnicholas
06/03/2021, 10:52 PMChohang Ng
06/03/2021, 10:54 PMnicholas
06/03/2021, 11:00 PMChohang Ng
06/03/2021, 11:08 PMnicholas
06/03/2021, 11:09 PMChohang Ng
06/03/2021, 11:13 PMfrom prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor
from prefect import task, Flow, Parameter
import prefect
import os,sys
from task import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(),
run_config=LocalRun()) as advertisement_fee:
df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
nicholas
06/03/2021, 11:29 PMtask
from prefect
as well as calling from task import ...
- it's usually not a good idea to name your python files the same as other things you're importing, since serializers like cloudpickle
won't be able to differentiate the 2Chohang Ng
06/04/2021, 12:29 AMfrom prefect import task, Flow, Parameter
import prefect
import os,sys
from etl import Extract,Load,Create_tmp,Drop_tmp,Load_revenue_fee,Load_profit_dash,Delete_data
import db as db ## credential
import pendulum ## for crons timestamp
from prefect import Flow
from prefect.schedules import CronSchedule
from prefect.tasks.prefect import StartFlowRun ## flow of flows
import datetime
from prefect.agent.local import LocalAgent
from prefect.run_configs import LocalRun
from prefect.executors import LocalExecutor
with Flow("1_oAdvertisements_flow", executor=LocalExecutor(),
run_config=LocalRun()) as advertisement_fee:
df = Extract()('Advertisements_SQL\\Get_oAdvertisements.sql',1)
Load()(df,'oAdvertisements',upstream_tasks= [df])
advertisement_fee.register(project_name = 'profit_test')
nicholas
06/04/2021, 2:35 PMChohang Ng
06/04/2021, 3:26 PMnicholas
06/04/2021, 3:28 PMChohang Ng
06/04/2021, 5:40 PMnicholas
06/04/2021, 5:45 PMChohang Ng
06/04/2021, 5:50 PMGeorge Coyne
06/04/2021, 6:04 PM