juana
10/03/2024, 5:23 AMMarvin
10/03/2024, 5:23 AM.submit()
method to call a task within a flow, Prefect does not utilize the ThreadPoolTaskRunner
for that task by default. Instead, tasks in Prefect are handled differently based on how they're executed:
1. When a task is called with .submit()
, it uses the ThreadPoolTaskRunner
as the default task runner. This method not only submits the task but also allows the task runner to create a PrefectFuture
, which gives you access to the task's state and result.
2. If a task is not called with .submit()
, it will run synchronously, directly within the context of the flow's execution, and not within a separate thread managed by the ThreadPoolTaskRunner
. This means it won't have the same asynchronous execution benefits and handling as tasks submitted using .submit()
.
For more details, you can refer to the Prefect 3 documentation on task runners.juana
10/03/2024, 5:28 AMMarvin
10/03/2024, 5:28 AMpython
from prefect import flow, task, get_run_logger
from prefect.states import Failed
@task
def run_shell_command():
# Example shell command
result = os.system("echo Hello World")
if result != 0:
raise Exception("Shell command failed")
@task
def process_results():
get_run_logger().info("Processing results...")
@flow
def my_flow():
try:
shell_task = run_shell_command()
except Failed:
get_run_logger().info("Shell command failed. Skipping processing.")
return
process_results()
my_flow()
In this example, run_shell_command()
executes a system command, and if it fails, the exception is caught, and the subsequent task process_results()
is skipped.
For more information about conditional dependencies and handling task failures, you can refer to Prefect 3 documentation on writing tasks and handling dependencies.