https://prefect.io logo
Title
k

Kendall Bailey

12/09/2022, 4:00 PM
Question regarding resource management in Prefect 2.x when all tasks are async (i.e. “submitted”) and some tasks are optional, in 🧵
Tried to boil this down to match my use case. • A resource needs to be created before some tasks can run. It’s slow to create. • Other tasks can run without the resource, so these should run immediately • Some of tasks that need the resource are critical, and must succeed for the flow to be a success. Others are not critical and are allowed to fail. • The resource needs to be cleaned up once all tasks that need it have either succeeded or failed. • Some resource using tasks depend on others, while some are independent of the others. Example code follows:
import time, sys, contextlib
import prefect

st = time.time()

def print_(*args):
    'simplified logging, whole second granularity'
    now = int(time.time()-st)
    print('T -', now, *args)

def check(op, r):
    'Trigger failure based on script argument'
    if sys.argv[-1] == f'{op} {r} fail':
        raise RuntimeError(sys.argv[-1])

@prefect.task
def noop():
    print_('noop')

@prefect.task
def slow(m, t=3):
    print_("slow", m)
    time.sleep(t)
    check('slow', m)
    print_("slow", m, "done")

@prefect.task
def fast(m):
    print_("fast", m)
    check('fast', m)
    print_("fast", m, "done")

@contextlib.contextmanager
def resource(m):
    '''Handle create/destroy of a resource needed by
    async tasks'''
    terminals = [] # futures that have to complete before cleanup
    try:
        create = slow.submit(f'CREATE {m}')
        terminals.append(create)
        yield create, terminals
    finally:
        prefect.task(name='D')(print_).submit('DESTROYED', m, wait_for=terminals)

@prefect.flow
def flow():
    results = {}  # futures we need to succeed
    with resource('R') as (create, terminals):
        s1 = slow.submit('R', 5, wait_for=[create])
        results['s1'] = s1
        f1 = fast.submit('R1', wait_for=[s1])
        f2 = fast.submit('R2', wait_for=[create])
        results['f2'] = f2
        terminals.extend([f1, f2])

    s2 = slow.submit('None', 2)
    f3 = fast.submit('None', wait_for=[s2])

    return {name:val.result() for name, val in results.items()}


if __name__ == "__main__":
    state = flow(return_state=True)
    print("success =", state.is_completed(), state)
I attempted to create a context manager that handles the resource lifetime by tying together tasks via
wait_for
. However, if any waited for task fails, the resource is not cleaned up.
Running the code without argument yields the expected outcome:
T - 0 slow CREATE R
T - 0 slow None
T - 2 slow None done
T - 3 fast None
T - 3 fast None done
T - 3 slow CREATE R done
T - 4 fast R2
T - 4 fast R2 done
T - 4 slow R
T - 9 slow R done
T - 9 fast R1
T - 9 fast R1 done
T - 9 DESTROYED R
The tasks needing no resource run right away, the others run in the proper order and the resource is destroyed. Now consider if f2 fails. Run with
python test.py "fast R2 fail"
T - 0 slow None
T - 0 slow CREATE R
T - 2 slow None done
T - 2 fast None
T - 2 fast None done
T - 3 slow CREATE R done
T - 4 slow R
T - 4 fast R2
T - 9 slow R done
T - 9 fast R1
T - 9 fast R1 done
The flow fails as it should, but the resource is not destroyed. How should example be changed to ensure the resource is cleaned up?
run
python test.py "fast R1 fail"
T - 0 slow None
T - 1 slow CREATE R
T - 2 slow None done
T - 2 fast None
T - 2 fast None done
T - 4 slow CREATE R done
T - 4 fast R2
T - 4 fast R2 done
T - 4 slow R
T - 9 slow R done
T - 9 fast R1
success = True Completed()
succeeds as it should, since f1 is optional, and again the resource is not destroyed.