https://prefect.io logo
Title
c

Chohang Ng

06/03/2021, 8:18 PM
Here the Drop_tmp was executed after the create tmp and thus the dependency was lost. Drop_tmp is supposed to execute last according to my flow. What am I missing? If I don't set upstream and downstream, it might execute out of order? But when I look at other flows, most of them are still fine.
k

Kevin Kho

06/03/2021, 8:20 PM
Hey @Chohang Ng, so the order of which you call your functions does not define the dependency. Tasks take an
upstream_task
parameter. That you can use. Let me make a quick example
c

Chohang Ng

06/03/2021, 8:21 PM
Ok. I was using upstream_task in flows of flows but not individual flows.
k

Kevin Kho

06/03/2021, 8:25 PM
from prefect import Flow, task, Task
import time
from prefect.executors import LocalDaskExecutor
from datetime import timedelta

@task()
def abc(x):
    return x

class B(Task):
    def run(self, x):
        return x

with Flow(
    "test",
) as flow:
    test = abc(1)
    test2 = B()(1, upstream_tasks=[test])

flow.run()
c

Chohang Ng

06/03/2021, 8:27 PM
what if the def abc doesn't return anything, do you still store the abc(1) to test?
k

Kevin Kho

06/03/2021, 8:28 PM
You can test with this. Still should work. Or just use a
time.sleep(x)
@task()
def abc(x):
    pass
s

Spencer

06/03/2021, 8:28 PM
This is how I would go about it, I think I understood the intent.
@task
def get_datestr():
  return prefect.context.today

create_tmp = Create_Tmp()
extract = Extract()
load = Load()
drop = Drop_tmp()

with Flow(..) as flow:
  datestr = get_datestr()
  
  shipcosts = create_tmp('ShippingFees_SQL\\Create_shipcostsTemp.sql', datestr)
  shiptotals = create_tmp('ShippingFees_SQL\\Create_shiptotalsTemp.sql', datestr)

  df = extract('ShippingFees_SQL\\GetShippingFeesByOrderItem.sql', datestr)
  df.set_upstream(shipcosts)
  df.set_upstream(shiptotals)

  load_fees = load(df, 'oFeesByOrderItem')

  run_drop = drop('ShippingFees_SQL\\Drop_shippingTemp.sql')
  run_drop.set_upstream(load_fees)
👍 1
k

Kevin Kho

06/03/2021, 8:29 PM
@Spencer gives a nice alternative
c

Chohang Ng

06/03/2021, 8:29 PM
so you can pass upstream_tasks to the run method?
k

Kevin Kho

06/03/2021, 8:31 PM
You mean in the class definition right? I don’t think so because the instances of those tasks won’t be defined yet during build time
c

Chohang Ng

06/03/2021, 8:32 PM
test2 = B()(1, upstream_tasks=[test])
you did this in the last example. It looks like you pass upstream_tasks to the run method of B
k

Kevin Kho

06/03/2021, 8:33 PM
Oh I thought you meant in the class definition. Yes this takes all the task
kwargs
.
The code snippet I sent fully works
🙌 1