Thread
#prefect-community
    Michael Smith

    Michael Smith

    6 months ago
    Hello, I am looking into Prefect 2.0 and am wondering how we might implement a cleanup function - in prefect 1 from what I have seen we could use triggers "All Finished" when defining the task - I cant see the equivalent in V2, any suggestions?
    Andrew Huang

    Andrew Huang

    6 months ago
    I think wrapping a
    finally
    statement works
    from prefect import flow, task
    
    @task
    def do_something() -> bool:
        return True
    
    
    @task
    def do_something_bad():
        raise RuntimeError("Oops")
    
    
    def cleanup():
        print("cleaning up")
    
    
    @flow
    def test():
        try:
            do_something()
            do_something()
            do_something_bad()
        finally:
            cleanup()
    
    test()
    Kevin Kho

    Kevin Kho

    6 months ago
    Andrew’s suggestion is interesting. I was just thinking:
    @flow
    def test():
        a = task_one(raise_on_exception=False)
        b = task_two(raise_on_exception=False)
        c = task_three(raise_on_exception=False)
        cleanup(wait_for=[a,b,c])
    Michael Smith

    Michael Smith

    6 months ago
    I was wondering if cleanup could run prematurely with the first approach?
    and wait_for seems interesting but I earlier spotted a ReservedArgumentError here https://github.com/PrefectHQ/prefect/blob/orion/src/prefect/tasks.py?
    Kevin Kho

    Kevin Kho

    6 months ago
    That
    ReservedArgumentrError
    is on the function that is decorated during runtime because it’s reserved for the
    call
    from my understanding
    See here
    Andrew Huang

    Andrew Huang

    6 months ago
    you’re right; it did run prematurely; I wonder if that should be the expected behavior or not
    Kevin Kho

    Kevin Kho

    6 months ago
    It might be premature because the call just creates the Future and is technically done executing right? You’d need to call .wait() or .result()
    Andrew Huang

    Andrew Huang

    6 months ago
    yep that makes sense. since finally statements are usually called for cleanup, I wonder if prefect should raise a warning? but I guess if something is passed to it, it’ll wait properly
    okay it works if cleanup is decorated
    import time
    from prefect import flow, task
    
    @task
    def do_something() -> bool:
        time.sleep(2)
        return ["a", "list"]
    
    
    @task
    def do_something_bad():
        raise RuntimeError("Oops")
    
    
    @task
    def cleanup(*args):
        for arg in args:
            del arg
        print("cleaning up")
    
    
    @flow
    def test():
        try:
            a = do_something()
        finally:
            cleanup(a)
    
    test()
    Michael Smith

    Michael Smith

    6 months ago
    Kevin, it doesnt seem to like raise_on_exception - for the happy path the cleanup gets called fine, but the following is failing
    import requests
    from prefect import flow, task
    from time import sleep
    
    @task(retries=3,retry_delay_seconds=3)
    def do_something():
        print("This should run in parallel?")
        sleep(2)
    
    @task(retries=1,retry_delay_seconds=3)
    def call_api(url):
        response = requests.get(url)
        print(response.status_code)
        sleep(2)
        return response.json()
    
    @task(retries=3,retry_delay_seconds=3)
    def cleanup():
        print("Got the cleanup call!")
    
    @flow
    def api_flow(url):
        a = do_something()
        b = call_api(url, raise_on_exception=False)
        cleanup(wait_for=[a,b])
    
    
    #api_flow("<http://time.jsontest.com/>") #happy
    api_flow("doh")
    And without the raise_on_exception (effectively Andrew's solution) I get :
    18:37:18.778 | ERROR   | Task run 'call_api-190c7484-0' - Finished in state Failed('Task run encountered an exception.')
    18:37:18.789 | ERROR   | Task run 'cleanup-19d7be42-0' - Finished in state NotReady("Upstream task run '01fefb08-5fb4-431a-818e-ae3b6d740ff4' did not reach a 'COMPLETED' state.", type=PENDING)
    18:37:19.746 | ERROR   | Flow run 'passionate-taipan' - Finished in state Failed('1/3 states failed.')
    Kevin Kho

    Kevin Kho

    6 months ago
    I think my syntax was wrong one sec
    Ok so if
    cleanup
    doesn’t have to be a task, you can do:
    def cleanup(*futures):
        for future in futures:
            future.wait()
        print("Got the cleanup call!")
    
    @flow
    def api_flow(url):
        a = do_something()
        b = call_api(url)
        cleanup(a,b)