Chohang Ng
06/03/2021, 8:18 PMKevin Kho
06/03/2021, 8:20 PMupstream_task
parameter. That you can use. Let me make a quick exampleChohang Ng
06/03/2021, 8:21 PMKevin Kho
06/03/2021, 8:25 PMfrom prefect import Flow, task, Task
import time
from prefect.executors import LocalDaskExecutor
from datetime import timedelta
@task()
def abc(x):
return x
class B(Task):
def run(self, x):
return x
with Flow(
"test",
) as flow:
test = abc(1)
test2 = B()(1, upstream_tasks=[test])
flow.run()
Chohang Ng
06/03/2021, 8:27 PMKevin Kho
06/03/2021, 8:28 PMtime.sleep(x)
@task()
def abc(x):
pass
Spencer
06/03/2021, 8:28 PM@task
def get_datestr():
return prefect.context.today
create_tmp = Create_Tmp()
extract = Extract()
load = Load()
drop = Drop_tmp()
with Flow(..) as flow:
datestr = get_datestr()
shipcosts = create_tmp('ShippingFees_SQL\\Create_shipcostsTemp.sql', datestr)
shiptotals = create_tmp('ShippingFees_SQL\\Create_shiptotalsTemp.sql', datestr)
df = extract('ShippingFees_SQL\\GetShippingFeesByOrderItem.sql', datestr)
df.set_upstream(shipcosts)
df.set_upstream(shiptotals)
load_fees = load(df, 'oFeesByOrderItem')
run_drop = drop('ShippingFees_SQL\\Drop_shippingTemp.sql')
run_drop.set_upstream(load_fees)
Kevin Kho
06/03/2021, 8:29 PMChohang Ng
06/03/2021, 8:29 PMKevin Kho
06/03/2021, 8:31 PMChohang Ng
06/03/2021, 8:32 PMtest2 = B()(1, upstream_tasks=[test])
you did this in the last example. It looks like you pass upstream_tasks to the run method of BKevin Kho
06/03/2021, 8:33 PMkwargs
.