Does anyone have a dummy task available that would...
# ask-community
j
Does anyone have a dummy task available that would check whether a task above has run?
a
Hi @Jason Motley, you can use trigger for this . you can set the downstream task as dependent on upstream task and use trigger= all_successful to run the downstream task if the upstream task is successfully run. Sharing a dummy stub for ref:
Copy code
@task
def task1(i):
    print(f"task{i}_1")

@task(trigger=all_successful)
def task2(i):
    print(f"task{i}_2")

with Flow('new Flow') as flow:
        task1 = task1(i)
        task2 = task2(i,upstream_tasks=[task1])
    flow.run()
upvote 2
j
What if the upstream task is not a task? I need to delete rows out of the database and then append new data.
For the deletion the only method I've been able to get working in Prefect is to run a connection.execute() statement
Example:
Copy code
@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
    engine = connection
    with engine.connect() as connection:
        result = connection.execute(db.text(f"""
        delete
        from {table_name}
        WHERE Date > '{index_date}'
        """))
    result 

with flow bla bla:
    deleter_date() #this works
    delete_stuff = deleter_date() #This does not work
but I need to be able to reference "delete_stuff" as an upstream task..
The issue with the proposed setup is that the delete statement never actually runs when you set it equal to "task 1" in the flow
k
Sorry, am a bit confused here. I think you need stuff to be a task to get a signal from it. If this fails, I think it will FAIL the task and downstream tasks do not run. If you are just asking how to set an upstream, it would be:
Copy code
with flow bla bla:
    a = deleter_date()
    something_else(upstream_tasks=[a])
Do you get an error in the task?
a
cant you do :
Copy code
with flow bla bla:
    task1 = deleter_date() #this works
    delete_stuff = deleter_date(upstream_tasks=[task1])
and set a trigger on the deleter_date method
j
No, the error is that even though task "a" runs, it does not actually delete records in the database.
k
This sounds like you either need to: 1. Raise an error based on a given critera inside the task 2. Check the criteria in the downstream task and then raise an error if it’s not met
j
To approach it differently, why does the following setup show as the task running but then nothing actually happens in reality?
Copy code
#Delete on date
@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
    engine = connection
    with engine.connect() as connection:
        result = connection.execute(db.text(f"""
        delete
        from {table_name}
        WHERE DATE(Date) > '{index_date}'
        """))
        result
Copy code
with flow():
   delete=deleter_date(connection_stuf) # <- says it executes but does not delete rows
k
There should not be a difference with the two ways the tasks are called. Is the behavior the same if you flip the order?
j
what do you mean?
k
Copy code
with flow bla bla:
    deleter_date() #this works
    delete_stuff = deleter_date() #This does not work
these ways of calling a task are pretty much equivalent
What happens if you try
Copy code
with flow bla bla:
    delete_stuff = deleter_date() #This does not work
    deleter_date() #this works
does the top one work?
j
yeah so deleter_date() works fine, except you cannot then reference it as an upstream task alter on
later*
because you have not set it
=
to anything
whereas delete_stuff=deleter_date() does NOT work but CAN be referenced as an usptream task
k
I am not seeing any reason why these calls would be different. Are you calling them both in the same Flow? Or you are running two flows with them one at a time? Are the inputs entirely equivalent?
j
same flow
Same inputs too
k
If you are calling them in the same flow, I am suggesting calling them in a different order to see if the order is the thing affecting it
j
I mean I'm running one or the other, the issue is that the one that DOES run cannot be called as an upstream task later on
I need to be able to do this basically:
Copy code
with flow():
df = extract(stuff)
delete=delete_records(connections) #This shows the task working but no deletion happens
load_task = load(df, upstream_tasks=[delete])
Where as if it is set up like this, then it will run the delete before/after/whenever the load:
Copy code
with flow():
df = extract(stuff)
delete_records() # This works but will run out of order
load_task = load(df) # Cannot specify delete_records() as upstream task
k
I get that, but I really can’t think of any reason why these calls are different. Could you try adding logging statements before and after the SQL delete to get some tinsight?
j
sure, what is the best approach there?
k
Copy code
@task(name = "Delete Records on Date",log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=10))
def deleter_date(connection: any, table_name: str, index_date) -> any:
    engine = connection
    <http://prefect.context.logger.info|prefect.context.logger.info>("DELETE IS STARTING")
    with engine.connect() as connection:
        result = connection.execute(db.text(f"""
        delete
        from {table_name}
        WHERE DATE(Date) > '{index_date}'
        """))
    <http://prefect.context.logger.info|prefect.context.logger.info>("DELETE HAS ENDED")
        result
you need to
import prefect
to use this
j
just fully import prefect?
k
yes for the logger
j
Kk, it'll take a bit to run, I'll let you know
@Kevin Kho
k
So the task runs and it actually goes through. I think to test this, instead of a delete, can you make it a query and log the results as well? I think both syntaxes should query right
j
Yup, will do
could you explain conceptually why I can't use
delete_stuff()
as an upstream task? Why does it need to be set equal to something, e.x.
deleter = delete_stuff()
?
Just kind of annoying since it works fine in normal python
k
Did you try it? Try setting upstream to
delete_stuff
? I don’t recommend it but I think it might work if there is only one instance. But to answer you it’s because tasks can be reused so you need to point to some reference
j
Yeah I can get you the error if that helps, invalid reference
k
Like try this:
Copy code
from prefect import Flow, task

@task
def abc():
    return 1

@task
def bcd():
    return 2

with Flow("test") as flow:
    abc()
    bcd(upstream_tasks=[abc])

flow.run()
j
Shows that they ran in this case actually
is there a way to show the flow diagram?
flow.diagram()?
k
flow.visualize()
if you have
graphviz
installed. I honestly just register and view it in the UI
You can’t always do this btw (so i generally don’t suggest it to people cuz it’s finicky). Like I think it has to be the only call of the task.
j
Right, i'd agree that it seems like a workaround that won't be sustainable
Copy code
'bcd' is not recognized as an internal or external command,
operable program or batch file.
that's the error
Interesting diagram tough
k
Ah then you don’t call the first
abc
because it will call it automatically for you
j
that is interesting
and also seems to point to the limitations of this setup
you can only use task abc once
k
Yeah so I think the variable assignment is much preferred
j
Cool, this has been very insightful, really appreciate it.
k
Of course!