Would really appreciate some help with this :grin:...
# prefect-community
r
Would really appreciate some help with this 😁: https://prefect-community.slack.com/archives/CL09KU1K7/p1669371891611149
p
You probably need to use
.submit()
here. I'll be back with a minimum working example shortly.
🙏 1
hey there - sorry about the delay on this. Here's the general principle:
Copy code
from prefect import task, flow, allow_failure, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner

@task
def upstream_task(item):
    if item == "c":
        raise Exception("this upstream task failed")
    return str(item) + "+1"

@task
def downstream_task(item):
    return str(item) + "+2"

@flow
def demo():
    items = ["a", "b", "c", "d"]

    # asynchronously submit tasks (equivalent to map)
    submitted = []
    for item in items:
        submitted.append(
            upstream_task.submit(item)
        )

    # get the final state of each task
    finished_states = [
        s.wait() for s in submitted
    ] 

    # asynchronously submit only the tasks that succeeded
    final_submitted = []
    for item, state in zip(items, finished_states):
        if state.type == "COMPLETED":
            final_submitted.append(
                downstream_task.submit(item)
            )
    
    # get the results of all submitted tasks
    final_results = [
        f.result(raise_on_failure=False) for f in final_submitted
    ]
    print(final_results)


if __name__ == "__main__":
    demo()
🙌 1
1
If you just run this as a python script, your very last console output will be the exception that gets raised, but if you scroll up, you'll see that the correct items were run. If you run it as a deployment, it should be clearer.
r
Thanks!