<@ULVA73B9P>, could you tell me more about `tasks`...
# ask-marvin
j
@Marvin, could you tell me more about
tasks
calling
flows
?
m
In Prefect, tasks can indeed call subflows, allowing you to do work within your workflow and pass results from other tasks to your subflow. Here are some insights about using tasks and subflows: 1. Prefect Task: A task is a Python function decorated with
@task
called within a flow. It is designed to perform a discrete, specific piece of work, such as calling an API, performing a database operation, or transforming data. A task runs concurrently, and its return value can be cached. 2. Prefect Subflow: A subflow can be thought of as a separate flow that can be invoked from within another flow. It is useful when you want to modularize portions of your workflows so that you can reuse them across different flows or manage complex workflows more efficiently. 3. When to use Subflows vs. Tasks: Use tasks for singular actions and when the ability to retry, cache, and run tasks concurrently is needed. Choose subflows for orchestrating multiple such tasks or when dealing with a complex piece of logic that would benefit from being segmented into its own flow. For further reading, you can check out these sections in the Prefect documentation: - Concepts of Flows and Tasks - Tutorial on Tasks
j
@Marvin is there a way to have only certain exceptions retried for a failed Prefect task when you set
retries
task argument?
m
Yes, you can specify a custom retry condition for a Prefect task that retries only for certain exceptions. This is implemented using the
retry_condition_fn
argument in the task decorator, where you can pass a function that determines whether or not to retry based on the type of exception raised. Here's an example of a custom retry handler:
Copy code
python
from prefect import task, flow

def retry_handler(exception):
    # Define exceptions that you do not want to retry on
    do_not_retry_on_these_codes = {httpx.ConnectError}
    
    if isinstance(exception, httpx.HTTPError) and \
       exception.response.status_code not in do_not_retry_on_these_codes:
        # Retry if the response has an unexpected status code
        return True
    elif isinstance(exception, httpx.ConnectError):
        # Do not retry on ConnectionError
        return False
    else:
        # For any other exception, retry
        return True

@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@flow
def get_data_flow(url):
    my_api_call_task(url=url)

if __name__ == "__main__":
    get_data_flow(url="<https://httpbin.org/status/503>")
In this example,
retry_handler
is the custom function to determine the retry condition. It retried the task when an
httpx.HTTPError
occurred but did not retry on
httpx.ConnectError
. For further details, you can reference the Tasks documentation.
catjam 1