Jack P
04/20/2024, 4:46 PMtasks
calling flows
?Marvin
04/20/2024, 4:46 PM@task
called within a flow. It is designed to perform a discrete, specific piece of work, such as calling an API, performing a database operation, or transforming data. A task runs concurrently, and its return value can be cached.
2. Prefect Subflow: A subflow can be thought of as a separate flow that can be invoked from within another flow. It is useful when you want to modularize portions of your workflows so that you can reuse them across different flows or manage complex workflows more efficiently.
3. When to use Subflows vs. Tasks: Use tasks for singular actions and when the ability to retry, cache, and run tasks concurrently is needed. Choose subflows for orchestrating multiple such tasks or when dealing with a complex piece of logic that would benefit from being segmented into its own flow.
For further reading, you can check out these sections in the Prefect documentation:
- Concepts of Flows and Tasks
- Tutorial on TasksJack P
04/20/2024, 5:21 PMretries
task argument?Marvin
04/20/2024, 5:21 PMretry_condition_fn
argument in the task decorator, where you can pass a function that determines whether or not to retry based on the type of exception raised.
Here's an example of a custom retry handler:
python
from prefect import task, flow
def retry_handler(exception):
# Define exceptions that you do not want to retry on
do_not_retry_on_these_codes = {httpx.ConnectError}
if isinstance(exception, httpx.HTTPError) and \
exception.response.status_code not in do_not_retry_on_these_codes:
# Retry if the response has an unexpected status code
return True
elif isinstance(exception, httpx.ConnectError):
# Do not retry on ConnectionError
return False
else:
# For any other exception, retry
return True
@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
response = httpx.get(url)
response.raise_for_status()
return response.json()
@flow
def get_data_flow(url):
my_api_call_task(url=url)
if __name__ == "__main__":
get_data_flow(url="<https://httpbin.org/status/503>")
In this example, retry_handler
is the custom function to determine the retry condition. It retried the task when an httpx.HTTPError
occurred but did not retry on httpx.ConnectError
.
For further details, you can reference the Tasks documentation.