<@ULVA73B9P> I have 3 tasks A,B,C. B is dependant ...
# ask-marvin
n
@Marvin I have 3 tasks A,B,C. B is dependant on A completing successfully (use wait_for). C must always run. i.e. if a task fails, it should not stop the entire flow running. This is Prefect 2. Help.
m
In Prefect 2.x, setting dependencies between tasks and ensuring that a task runs irrespective of another task's failure can be handled like this: 1. Setting Dependencies Using `wait_for`: - To ensure that Task B depends on Task A, you can use the
wait_for
argument of the Task B's function call within the flow by passing it Task A. 2. Ensuring a Task Always Runs: - To make Task C always run, you can capture the state of Task B and explicitly invoke Task C regardless of the state. Here's how you could define such flow in Prefect 2.x:
Copy code
python
from prefect import flow, task, get_run_logger

@task
def task_a():
    # ... implementation for Task A
    logger = get_run_logger()
    <http://logger.info|logger.info>("Task A completed")
    # return some value if needed

@task
def task_b(upstream_task_result):
    # ... implementation for Task B that depends on Task A
    logger = get_run_logger()
    <http://logger.info|logger.info>("Task B completed with upstream task result: {}".format(upstream_task_result))

@task
def task_c(always_run=True):
    # ... implementation for Task C that should always run
    logger = get_run_logger()
    <http://logger.info|logger.info>("Task C is running regardless of upstream tasks")

@flow
def my_flow():
    a_result = task_a()
    b_state = task_b(wait_for=[a_result])
    c_state = task_c()

my_flow()
In this example: -
task_b(wait_for=[a_result])
ensures that
task_b
will wait for
task_a
to finish before starting. -
task_c()
is set to run regardless of the state of the previous tasks. If you handle
task_c
invocation within some logic that checks the states of
task_a
and
task_b
, it can certainly run no matter what happened before. Remember to check the exact implementation as per the latest Prefect 2.x documentation since library updates could influence the exact syntax and best practices.
n
this part should be
Copy code
def my_flow():
    a_result = task_a()
    b_state = task_b(wait_for=[a_result], return_state=True) # trap any failure
    task_c()