Klashe
02/18/2025, 3:26 AMMarvin
02/18/2025, 3:26 AMPrefectConcurrentFuture
does not have an is_completed
attribute or method. This object is a type of future used by Prefect to handle task execution, but it doesn't directly provide a method to check if the task is completed in the way you're trying.
Instead, you should use the add_done_callback
method to execute a function when the future is completed. Alternatively, you might be interested in checking the state of the task through the state
attribute, which can tell you if the task is completed or failed.
Here's a refined approach to handle your logic:
python
<http://logger.info|logger.info>("Fetching API Data...")
stream_data_list = fetch_from_api.submit(conn, record_metadata_id, pages_to_fetch, params=None, headers=api_headers, return_state=True)
def on_done(future):
if future.state.is_failed():
logger.warning("Task failed with message: %s", future.state.message)
return Failed(message="Flow failed due to a critical task failure.")
stream_data_list.add_done_callback(on_done)
This way, you can handle the task completion more gracefully.
For more details, you can check the Prefect futures documentation and the Prefect task documentation.Klashe
02/18/2025, 3:30 AMMarvin
02/18/2025, 3:30 AMfetch_from_api.submit
.
In one instance, the future object (applist
) might be using a different type or version that indeed provides an is_completed()
method. Prefect's documentation or code suggests that this method isn't standard for PrefectConcurrentFuture
. This discrepancy might arise from different environments, versions, or configurations where the code is executed.
To resolve this, you can ensure consistency by using the add_done_callback
method or by checking the state
attribute of the future object, which is more consistent with Prefect's API.
For more details, you can refer to the Prefect futures documentation and check the specific implementation of the fetch_from_api
function to understand the type of future it returns.