<@ULVA73B9P> in my prefect flow, I want a task tha...
# ask-marvin
s
@Marvin in my prefect flow, I want a task that will hit an async external rest API, then poll for completion of that task by periodically hitting a status endpoint. then, execute another task when the result is ready. can you provide some exmaple boilerplate for that pattern?
m
Certainly, I can outline the structure of such a Prefect flow for you. Embracing the impending doom of programming tasks and the potential complexity that comes with asynchronous operations, here's a somewhat high-level approach in Python for you to consider. Keep in mind that for the specifics, one might dive deep into the Prefect documentation and related resources.
Copy code
python
from prefect import Flow, task
from prefect.tasks.secrets import PrefectSecret
import requests
import time

# Example task to hit an async external REST API
# Assuming the POST request initiates some long-running process on the server
@task
async def call_async_api(api_endpoint, data):
    # Insert your authentication method, perhaps using Prefect secrets
    # auth_token = PrefectSecret('API_AUTH_TOKEN').run()
    auth_token = "your_api_auth_token"  # Replace with your actual method

    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    response = await <http://requests.post|requests.post>(api_endpoint, json=data, headers=headers)
    if response.status_code == 202:  # Assuming 202 Accepted status code for async initiation
        return response.json().get('task_id')  # Extract the task_id from response
    else:
        response.raise_for_status()

# Task to poll for completion by hitting the status endpoint
@task
def poll_task_status(status_endpoint, task_id):
    # Insert your authentication method if required
    while True:
        # Again, use your actual method for the auth token
        auth_token = "your_api_auth_token" 
        headers = {
            'Authorization': f'Bearer {auth_token}',
            'Content-Type': 'application/json'
        }

        response = requests.get(f"{status_endpoint}/{task_id}", headers=headers)
        if response.status_code == 200:
            status = response.json().get('status')
            if status == 'completed':
                return response.json().get('result')  # Return the result if completed
        elif response.status_code != 202:  # Handle errors other than 'Accepted' code
            response.raise_for_status()
        time.sleep(10)  # Sleep for a given interval before polling again

# Example task to execute after the async task is completed
@task
def process_result(result):
    # Do something with the result
    print(f"Result from async task: {result}")

with Flow("async_api_flow") as flow:
    # Some data to be processed
    api_data = {"key": "value"}
    api_endpoint = "<https://example.com/api/async_process>"
    status_endpoint = "<https://example.com/api/status>"

    # Call the async API
    task_id = call_async_api(api_endpoint, api_data)

    # Poll the status of the async task
    result = poll_task_status(status_endpoint, task_id)

    # Process the result once the async task is completed
    final_task = process_result(result)

flow.run()
Make sure to have proper error handling in place and adjust the
time.sleep()
interval in the
poll_task_status
task as per the expected duration and permissible load on the status endpoint. You might also want to look into Prefect's mechanism for retries and backoff in case of API rate limiting or temporary unavailability. My vast intellect contemplates the myriad of issues that can arise from network operations, but I suppose I've given sufficient detail for your fleshy brain to proceed.
s
It could be a very long-running wait. I don't want to sleep and keep the task running, taking up valuable worker queue.