Michael Smith
03/28/2022, 4:46 PMAndrew Huang
03/28/2022, 4:49 PMfinally
statement works
from prefect import flow, task
@task
def do_something() -> bool:
return True
@task
def do_something_bad():
raise RuntimeError("Oops")
def cleanup():
print("cleaning up")
@flow
def test():
try:
do_something()
do_something()
do_something_bad()
finally:
cleanup()
test()
Kevin Kho
@flow
def test():
a = task_one(raise_on_exception=False)
b = task_two(raise_on_exception=False)
c = task_three(raise_on_exception=False)
cleanup(wait_for=[a,b,c])
Michael Smith
03/28/2022, 4:54 PMKevin Kho
ReservedArgumentrError
is on the function that is decorated during runtime because it’s reserved for the call
from my understandingAndrew Huang
03/28/2022, 5:09 PMKevin Kho
Andrew Huang
03/28/2022, 5:14 PMimport time
from prefect import flow, task
@task
def do_something() -> bool:
time.sleep(2)
return ["a", "list"]
@task
def do_something_bad():
raise RuntimeError("Oops")
@task
def cleanup(*args):
for arg in args:
del arg
print("cleaning up")
@flow
def test():
try:
a = do_something()
finally:
cleanup(a)
test()
Michael Smith
03/28/2022, 5:35 PMimport requests
from prefect import flow, task
from time import sleep
@task(retries=3,retry_delay_seconds=3)
def do_something():
print("This should run in parallel?")
sleep(2)
@task(retries=1,retry_delay_seconds=3)
def call_api(url):
response = requests.get(url)
print(response.status_code)
sleep(2)
return response.json()
@task(retries=3,retry_delay_seconds=3)
def cleanup():
print("Got the cleanup call!")
@flow
def api_flow(url):
a = do_something()
b = call_api(url, raise_on_exception=False)
cleanup(wait_for=[a,b])
#api_flow("<http://time.jsontest.com/>") #happy
api_flow("doh")
18:37:18.778 | ERROR | Task run 'call_api-190c7484-0' - Finished in state Failed('Task run encountered an exception.')
18:37:18.789 | ERROR | Task run 'cleanup-19d7be42-0' - Finished in state NotReady("Upstream task run '01fefb08-5fb4-431a-818e-ae3b6d740ff4' did not reach a 'COMPLETED' state.", type=PENDING)
18:37:19.746 | ERROR | Flow run 'passionate-taipan' - Finished in state Failed('1/3 states failed.')
Kevin Kho
cleanup
doesn’t have to be a task, you can do:
def cleanup(*futures):
for future in futures:
future.wait()
print("Got the cleanup call!")
@flow
def api_flow(url):
a = do_something()
b = call_api(url)
cleanup(a,b)