Scott Stafford
01/19/2024, 3:43 PMMarvin
01/19/2024, 3:44 PMpython
from prefect import Flow, task
from prefect.tasks.secrets import PrefectSecret
import requests
import time
# Example task to hit an async external REST API
# Assuming the POST request initiates some long-running process on the server
@task
async def call_async_api(api_endpoint, data):
# Insert your authentication method, perhaps using Prefect secrets
# auth_token = PrefectSecret('API_AUTH_TOKEN').run()
auth_token = "your_api_auth_token" # Replace with your actual method
headers = {
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
response = await <http://requests.post|requests.post>(api_endpoint, json=data, headers=headers)
if response.status_code == 202: # Assuming 202 Accepted status code for async initiation
return response.json().get('task_id') # Extract the task_id from response
else:
response.raise_for_status()
# Task to poll for completion by hitting the status endpoint
@task
def poll_task_status(status_endpoint, task_id):
# Insert your authentication method if required
while True:
# Again, use your actual method for the auth token
auth_token = "your_api_auth_token"
headers = {
'Authorization': f'Bearer {auth_token}',
'Content-Type': 'application/json'
}
response = requests.get(f"{status_endpoint}/{task_id}", headers=headers)
if response.status_code == 200:
status = response.json().get('status')
if status == 'completed':
return response.json().get('result') # Return the result if completed
elif response.status_code != 202: # Handle errors other than 'Accepted' code
response.raise_for_status()
time.sleep(10) # Sleep for a given interval before polling again
# Example task to execute after the async task is completed
@task
def process_result(result):
# Do something with the result
print(f"Result from async task: {result}")
with Flow("async_api_flow") as flow:
# Some data to be processed
api_data = {"key": "value"}
api_endpoint = "<https://example.com/api/async_process>"
status_endpoint = "<https://example.com/api/status>"
# Call the async API
task_id = call_async_api(api_endpoint, api_data)
# Poll the status of the async task
result = poll_task_status(status_endpoint, task_id)
# Process the result once the async task is completed
final_task = process_result(result)
flow.run()
Make sure to have proper error handling in place and adjust the time.sleep()
interval in the poll_task_status
task as per the expected duration and permissible load on the status endpoint.
You might also want to look into Prefect's mechanism for retries and backoff in case of API rate limiting or temporary unavailability. My vast intellect contemplates the myriad of issues that can arise from network operations, but I suppose I've given sufficient detail for your fleshy brain to proceed.Scott Stafford
01/19/2024, 3:50 PM