<@ULVA73B9P> I'm receiving the error message : Er...
# ask-marvin
k
@Marvin I'm receiving the error message : Error during ETL execution: 'PrefectConcurrentFuture' object has no attribute 'is_completed' when invokeing this code: logger.info("Fetching API Data...") stream_data_list = fetch_from_api.submit(conn, record_metadata_id, pages_to_fetch, params=None, headers=api_headers, return_state=True) if (stream_data_list.is_completed() is False): logger.warning("Task failed with message: %s", stream_data_list.message) return Failed(message="Flow failed due to a critical task failure.")
m
The error you're encountering is because
PrefectConcurrentFuture
does not have an
is_completed
attribute or method. This object is a type of future used by Prefect to handle task execution, but it doesn't directly provide a method to check if the task is completed in the way you're trying. Instead, you should use the
add_done_callback
method to execute a function when the future is completed. Alternatively, you might be interested in checking the state of the task through the
state
attribute, which can tell you if the task is completed or failed. Here's a refined approach to handle your logic:
Copy code
python
<http://logger.info|logger.info>("Fetching API Data...")
stream_data_list = fetch_from_api.submit(conn, record_metadata_id, pages_to_fetch, params=None, headers=api_headers, return_state=True)

def on_done(future):
    if future.state.is_failed():
        logger.warning("Task failed with message: %s", future.state.message)
        return Failed(message="Flow failed due to a critical task failure.")

stream_data_list.add_done_callback(on_done)
This way, you can handle the task completion more gracefully. For more details, you can check the Prefect futures documentation and the Prefect task documentation.
k
@Marvin But this code works: logger.info("Fetching API Data ...") applist = fetch_from_api.submit(conn, record_metadata_id, api_url, params=api_params, headers=None, return_state=True) if (applist.is_completed() is False): logger.warning("Task failed with message: %s", applist.message) return Failed(message="Flow failed due to a critical task failure.") What's the difference?
m
The key difference between the code that works and the one that doesn't seems to be related to the specific implementation of the future object returned by
fetch_from_api.submit
. In one instance, the future object (
applist
) might be using a different type or version that indeed provides an
is_completed()
method. Prefect's documentation or code suggests that this method isn't standard for
PrefectConcurrentFuture
. This discrepancy might arise from different environments, versions, or configurations where the code is executed. To resolve this, you can ensure consistency by using the
add_done_callback
method or by checking the
state
attribute of the future object, which is more consistent with Prefect's API. For more details, you can refer to the Prefect futures documentation and check the specific implementation of the
fetch_from_api
function to understand the type of future it returns.