Here the Drop_tmp was executed after the create tm...
# prefect-server
c
Here the Drop_tmp was executed after the create tmp and thus the dependency was lost. Drop_tmp is supposed to execute last according to my flow. What am I missing? If I don't set upstream and downstream, it might execute out of order? But when I look at other flows, most of them are still fine.
k
Hey @Chohang Ng, so the order of which you call your functions does not define the dependency. Tasks take an
upstream_task
parameter. That you can use. Let me make a quick example
c
Ok. I was using upstream_task in flows of flows but not individual flows.
k
Copy code
from prefect import Flow, task, Task
import time
from prefect.executors import LocalDaskExecutor
from datetime import timedelta

@task()
def abc(x):
    return x

class B(Task):
    def run(self, x):
        return x

with Flow(
    "test",
) as flow:
    test = abc(1)
    test2 = B()(1, upstream_tasks=[test])

flow.run()
c
what if the def abc doesn't return anything, do you still store the abc(1) to test?
k
You can test with this. Still should work. Or just use a
time.sleep(x)
Copy code
@task()
def abc(x):
    pass
s
This is how I would go about it, I think I understood the intent.
Copy code
@task
def get_datestr():
  return prefect.context.today

create_tmp = Create_Tmp()
extract = Extract()
load = Load()
drop = Drop_tmp()

with Flow(..) as flow:
  datestr = get_datestr()
  
  shipcosts = create_tmp('ShippingFees_SQL\\Create_shipcostsTemp.sql', datestr)
  shiptotals = create_tmp('ShippingFees_SQL\\Create_shiptotalsTemp.sql', datestr)

  df = extract('ShippingFees_SQL\\GetShippingFeesByOrderItem.sql', datestr)
  df.set_upstream(shipcosts)
  df.set_upstream(shiptotals)

  load_fees = load(df, 'oFeesByOrderItem')

  run_drop = drop('ShippingFees_SQL\\Drop_shippingTemp.sql')
  run_drop.set_upstream(load_fees)
👍 1
k
@Spencer gives a nice alternative
c
so you can pass upstream_tasks to the run method?
k
You mean in the class definition right? I don’t think so because the instances of those tasks won’t be defined yet during build time
c
Copy code
test2 = B()(1, upstream_tasks=[test])
you did this in the last example. It looks like you pass upstream_tasks to the run method of B
k
Oh I thought you meant in the class definition. Yes this takes all the task
kwargs
.
The code snippet I sent fully works
🙌 1