Nicholas Andonakis
01/31/2024, 3:26 AMMarvin
01/31/2024, 3:27 AMwait_for
argument of the Task B's function call within the flow by passing it Task A.
2. Ensuring a Task Always Runs:
- To make Task C always run, you can capture the state of Task B and explicitly invoke Task C regardless of the state.
Here's how you could define such flow in Prefect 2.x:
python
from prefect import flow, task, get_run_logger
@task
def task_a():
# ... implementation for Task A
logger = get_run_logger()
<http://logger.info|logger.info>("Task A completed")
# return some value if needed
@task
def task_b(upstream_task_result):
# ... implementation for Task B that depends on Task A
logger = get_run_logger()
<http://logger.info|logger.info>("Task B completed with upstream task result: {}".format(upstream_task_result))
@task
def task_c(always_run=True):
# ... implementation for Task C that should always run
logger = get_run_logger()
<http://logger.info|logger.info>("Task C is running regardless of upstream tasks")
@flow
def my_flow():
a_result = task_a()
b_state = task_b(wait_for=[a_result])
c_state = task_c()
my_flow()
In this example:
- task_b(wait_for=[a_result])
ensures that task_b
will wait for task_a
to finish before starting.
- task_c()
is set to run regardless of the state of the previous tasks. If you handle task_c
invocation within some logic that checks the states of task_a
and task_b
, it can certainly run no matter what happened before.
Remember to check the exact implementation as per the latest Prefect 2.x documentation since library updates could influence the exact syntax and best practices.Nate
01/31/2024, 4:04 AMdef my_flow():
a_result = task_a()
b_state = task_b(wait_for=[a_result], return_state=True) # trap any failure
task_c()