https://prefect.io logo
m

Michael Smith

03/28/2022, 4:46 PM
Hello, I am looking into Prefect 2.0 and am wondering how we might implement a cleanup function - in prefect 1 from what I have seen we could use triggers "All Finished" when defining the task - I cant see the equivalent in V2, any suggestions?
a

Andrew Huang

03/28/2022, 4:49 PM
I think wrapping a
finally
statement works
Copy code
from prefect import flow, task

@task
def do_something() -> bool:
    return True


@task
def do_something_bad():
    raise RuntimeError("Oops")


def cleanup():
    print("cleaning up")


@flow
def test():
    try:
        do_something()
        do_something()
        do_something_bad()
    finally:
        cleanup()

test()
k

Kevin Kho

03/28/2022, 4:51 PM
Andrew’s suggestion is interesting. I was just thinking:
Copy code
@flow
def test():
    a = task_one(raise_on_exception=False)
    b = task_two(raise_on_exception=False)
    c = task_three(raise_on_exception=False)
    cleanup(wait_for=[a,b,c])
đź‘€ 1
m

Michael Smith

03/28/2022, 4:54 PM
I was wondering if cleanup could run prematurely with the first approach?
âś… 1
and wait_for seems interesting but I earlier spotted a ReservedArgumentError here https://github.com/PrefectHQ/prefect/blob/orion/src/prefect/tasks.py?
k

Kevin Kho

03/28/2022, 4:57 PM
That
ReservedArgumentrError
is on the function that is decorated during runtime because it’s reserved for the
call
from my understanding
See here
a

Andrew Huang

03/28/2022, 5:09 PM
you’re right; it did run prematurely; I wonder if that should be the expected behavior or not
k

Kevin Kho

03/28/2022, 5:11 PM
It might be premature because the call just creates the Future and is technically done executing right? You’d need to call .wait() or .result()
a

Andrew Huang

03/28/2022, 5:14 PM
yep that makes sense. since finally statements are usually called for cleanup, I wonder if prefect should raise a warning? but I guess if something is passed to it, it’ll wait properly
okay it works if cleanup is decorated
Copy code
import time
from prefect import flow, task

@task
def do_something() -> bool:
    time.sleep(2)
    return ["a", "list"]


@task
def do_something_bad():
    raise RuntimeError("Oops")


@task
def cleanup(*args):
    for arg in args:
        del arg
    print("cleaning up")


@flow
def test():
    try:
        a = do_something()
    finally:
        cleanup(a)

test()
m

Michael Smith

03/28/2022, 5:35 PM
Kevin, it doesnt seem to like raise_on_exception - for the happy path the cleanup gets called fine, but the following is failing
Copy code
import requests
from prefect import flow, task
from time import sleep

@task(retries=3,retry_delay_seconds=3)
def do_something():
    print("This should run in parallel?")
    sleep(2)

@task(retries=1,retry_delay_seconds=3)
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    sleep(2)
    return response.json()

@task(retries=3,retry_delay_seconds=3)
def cleanup():
    print("Got the cleanup call!")

@flow
def api_flow(url):
    a = do_something()
    b = call_api(url, raise_on_exception=False)
    cleanup(wait_for=[a,b])


#api_flow("<http://time.jsontest.com/>") #happy
api_flow("doh")
And without the raise_on_exception (effectively Andrew's solution) I get :
Copy code
18:37:18.778 | ERROR   | Task run 'call_api-190c7484-0' - Finished in state Failed('Task run encountered an exception.')
18:37:18.789 | ERROR   | Task run 'cleanup-19d7be42-0' - Finished in state NotReady("Upstream task run '01fefb08-5fb4-431a-818e-ae3b6d740ff4' did not reach a 'COMPLETED' state.", type=PENDING)
18:37:19.746 | ERROR   | Flow run 'passionate-taipan' - Finished in state Failed('1/3 states failed.')
k

Kevin Kho

03/28/2022, 5:56 PM
I think my syntax was wrong one sec
Ok so if
cleanup
doesn’t have to be a task, you can do:
Copy code
def cleanup(*futures):
    for future in futures:
        future.wait()
    print("Got the cleanup call!")

@flow
def api_flow(url):
    a = do_something()
    b = call_api(url)
    cleanup(a,b)
5 Views