Jason Motley
12/14/2021, 6:27 PMAninda Bhattacharjee
12/14/2021, 6:31 PM@task
def task1(i):
print(f"task{i}_1")
@task(trigger=all_successful)
def task2(i):
print(f"task{i}_2")
with Flow('new Flow') as flow:
task1 = task1(i)
task2 = task2(i,upstream_tasks=[task1])
flow.run()
Jason Motley
12/14/2021, 6:33 PMJason Motley
12/14/2021, 6:33 PMJason Motley
12/14/2021, 6:34 PM@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
engine = connection
with engine.connect() as connection:
result = connection.execute(db.text(f"""
delete
from {table_name}
WHERE Date > '{index_date}'
"""))
result
with flow bla bla:
deleter_date() #this works
delete_stuff = deleter_date() #This does not work
Jason Motley
12/14/2021, 6:36 PMJason Motley
12/14/2021, 6:40 PMKevin Kho
with flow bla bla:
a = deleter_date()
something_else(upstream_tasks=[a])
Kevin Kho
Aninda Bhattacharjee
12/14/2021, 6:41 PMwith flow bla bla:
task1 = deleter_date() #this works
delete_stuff = deleter_date(upstream_tasks=[task1])
and set a trigger on the deleter_date methodJason Motley
12/14/2021, 6:41 PMKevin Kho
Jason Motley
12/14/2021, 6:43 PMJason Motley
12/14/2021, 6:43 PM#Delete on date
@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
engine = connection
with engine.connect() as connection:
result = connection.execute(db.text(f"""
delete
from {table_name}
WHERE DATE(Date) > '{index_date}'
"""))
result
Jason Motley
12/14/2021, 6:44 PMwith flow():
delete=deleter_date(connection_stuf) # <- says it executes but does not delete rows
Kevin Kho
Jason Motley
12/14/2021, 6:46 PMKevin Kho
with flow bla bla:
deleter_date() #this works
delete_stuff = deleter_date() #This does not work
these ways of calling a task are pretty much equivalentKevin Kho
with flow bla bla:
delete_stuff = deleter_date() #This does not work
deleter_date() #this works
does the top one work?Jason Motley
12/14/2021, 6:48 PMJason Motley
12/14/2021, 6:48 PMJason Motley
12/14/2021, 6:48 PM=
to anythingJason Motley
12/14/2021, 6:48 PMKevin Kho
Jason Motley
12/14/2021, 6:52 PMJason Motley
12/14/2021, 6:53 PMKevin Kho
Jason Motley
12/14/2021, 6:53 PMJason Motley
12/14/2021, 6:54 PMJason Motley
12/14/2021, 6:54 PMwith flow():
df = extract(stuff)
delete=delete_records(connections) #This shows the task working but no deletion happens
load_task = load(df, upstream_tasks=[delete])
Jason Motley
12/14/2021, 6:55 PMwith flow():
df = extract(stuff)
delete_records() # This works but will run out of order
load_task = load(df) # Cannot specify delete_records() as upstream task
Kevin Kho
Jason Motley
12/14/2021, 6:57 PMKevin Kho
@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
engine = connection
<http://prefect.context.logger.info|prefect.context.logger.info>("DELETE IS STARTING")
with engine.connect() as connection:
result = connection.execute(db.text(f"""
delete
from {table_name}
WHERE DATE(Date) > '{index_date}'
"""))
<http://prefect.context.logger.info|prefect.context.logger.info>("DELETE HAS ENDED")
result
you need to import prefect
to use thisJason Motley
12/14/2021, 7:01 PMKevin Kho
Jason Motley
12/14/2021, 7:11 PMJason Motley
12/14/2021, 7:27 PMKevin Kho
Jason Motley
12/14/2021, 7:31 PMJason Motley
12/14/2021, 7:32 PMdelete_stuff()
as an upstream task? Why does it need to be set equal to something, e.x. deleter = delete_stuff()
?Jason Motley
12/14/2021, 7:32 PMKevin Kho
delete_stuff
? I don’t recommend it but I think it might work if there is only one instance. But to answer you it’s because tasks can be reused so you need to point to some referenceJason Motley
12/14/2021, 7:36 PMKevin Kho
from prefect import Flow, task
@task
def abc():
return 1
@task
def bcd():
return 2
with Flow("test") as flow:
abc()
bcd(upstream_tasks=[abc])
flow.run()
Jason Motley
12/14/2021, 7:42 PMJason Motley
12/14/2021, 7:43 PMJason Motley
12/14/2021, 7:43 PMKevin Kho
flow.visualize()
if you have graphviz
installed. I honestly just register and view it in the UIKevin Kho
Jason Motley
12/14/2021, 7:45 PMJason Motley
12/14/2021, 7:46 PM'bcd' is not recognized as an internal or external command,
operable program or batch file.
Jason Motley
12/14/2021, 7:46 PMJason Motley
12/14/2021, 7:48 PMKevin Kho
abc
because it will call it automatically for youJason Motley
12/14/2021, 7:51 PMJason Motley
12/14/2021, 7:53 PMJason Motley
12/14/2021, 7:53 PMKevin Kho
Jason Motley
12/14/2021, 7:54 PMKevin Kho