Kendall Bailey
12/09/2022, 4:00 PMimport time, sys, contextlib
import prefect
st = time.time()
def print_(*args):
'simplified logging, whole second granularity'
now = int(time.time()-st)
print('T -', now, *args)
def check(op, r):
'Trigger failure based on script argument'
if sys.argv[-1] == f'{op} {r} fail':
raise RuntimeError(sys.argv[-1])
@prefect.task
def noop():
print_('noop')
@prefect.task
def slow(m, t=3):
print_("slow", m)
time.sleep(t)
check('slow', m)
print_("slow", m, "done")
@prefect.task
def fast(m):
print_("fast", m)
check('fast', m)
print_("fast", m, "done")
@contextlib.contextmanager
def resource(m):
'''Handle create/destroy of a resource needed by
async tasks'''
terminals = [] # futures that have to complete before cleanup
try:
create = slow.submit(f'CREATE {m}')
terminals.append(create)
yield create, terminals
finally:
prefect.task(name='D')(print_).submit('DESTROYED', m, wait_for=terminals)
@prefect.flow
def flow():
results = {} # futures we need to succeed
with resource('R') as (create, terminals):
s1 = slow.submit('R', 5, wait_for=[create])
results['s1'] = s1
f1 = fast.submit('R1', wait_for=[s1])
f2 = fast.submit('R2', wait_for=[create])
results['f2'] = f2
terminals.extend([f1, f2])
s2 = slow.submit('None', 2)
f3 = fast.submit('None', wait_for=[s2])
return {name:val.result() for name, val in results.items()}
if __name__ == "__main__":
state = flow(return_state=True)
print("success =", state.is_completed(), state)
wait_for
. However, if any waited for task fails, the resource is not cleaned up.T - 0 slow CREATE R
T - 0 slow None
T - 2 slow None done
T - 3 fast None
T - 3 fast None done
T - 3 slow CREATE R done
T - 4 fast R2
T - 4 fast R2 done
T - 4 slow R
T - 9 slow R done
T - 9 fast R1
T - 9 fast R1 done
T - 9 DESTROYED R
The tasks needing no resource run right away, the others run in the proper order and the resource is destroyed. Now consider if f2 fails. Run with python test.py "fast R2 fail"
T - 0 slow None
T - 0 slow CREATE R
T - 2 slow None done
T - 2 fast None
T - 2 fast None done
T - 3 slow CREATE R done
T - 4 slow R
T - 4 fast R2
T - 9 slow R done
T - 9 fast R1
T - 9 fast R1 done
The flow fails as it should, but the resource is not destroyed. How should example be changed to ensure the resource is cleaned up?python test.py "fast R1 fail"
T - 0 slow None
T - 1 slow CREATE R
T - 2 slow None done
T - 2 fast None
T - 2 fast None done
T - 4 slow CREATE R done
T - 4 fast R2
T - 4 fast R2 done
T - 4 slow R
T - 9 slow R done
T - 9 fast R1
success = True Completed()
succeeds as it should, since f1 is optional, and again the resource is not destroyed.