roady
11/30/2022, 10:44 AMPeyton Runyan
12/01/2022, 6:45 PM.submit()
here. I'll be back with a minimum working example shortly.from prefect import task, flow, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
@task
def upstream_task(item):
if item == "c":
raise Exception("this upstream task failed")
return str(item) + "+1"
@task
def downstream_task(item):
return str(item) + "+2"
@flow
def demo():
items = ["a", "b", "c", "d"]
# asynchronously submit tasks (equivalent to map)
submitted = []
for item in items:
submitted.append(
upstream_task.submit(item)
)
# get the final state of each task
finished_states = [
s.wait() for s in submitted
]
# asynchronously submit only the tasks that succeeded
final_submitted = []
for item, state in zip(items, finished_states):
if state.type == "COMPLETED":
final_submitted.append(
downstream_task.submit(item)
)
# get the results of all submitted tasks
final_results = [
f.result(raise_on_failure=False) for f in final_submitted
]
print(final_results)
if __name__ == "__main__":
demo()
roady
12/05/2022, 10:02 AM