quick question: in ControlFlow is there a way to a...
# marvin-ai
a
quick question: in ControlFlow is there a way to apply exponential backoff to avoid rate limiting? I got a few error messages after the agents running for a few minutes:
Copy code
13:24:12.882 | ERROR   | Task run 'Call LLM' - Task run failed with exception: RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)') - Retries are exhausted
Traceback (most recent call last):

...

httpx.RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)
13:24:12.891 | ERROR   | Task run 'Call LLM' - Finished in state Failed('Task run encountered an exception RemoteProtocolError: peer closed connection without sending complete message body (incomplete chunked read)')
13:24:12.892 | ERROR   | Task run 'Agent turn: Video Transcript Summarization Agent' - Task run failed with exception: RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)') - Retries are exhausted
Traceback (most recent call last):
...

13:24:12.898 | ERROR   | Task run 'Orchestrator.run()' - Task run failed with exception: RemoteProtocolError('peer closed connection without sending complete message body (incomplete chunked read)') - Retries are exhausted
Traceback (most recent call last):
my agents look somewhat like this:
Copy code
# Create a flow for processing each video
@cf.flow(context_kwargs=["content", "TOPICS"])
def process_video_flow(content, TOPICS):
    # Task 1: Summarize Video Description
    def summarize_video():
        summary_prompt = load_prompt('prompts/summarize2.txt', transcript=content)
        summary_result = cf.run(
            objective="Summarize the video transcript to capture main insights",
            instructions=summary_prompt,
            agents=[summary_agent]
        )

        return summary_result

    summary_result = summarize_video()

    # Task 2: Categorize Video based on summary
    def categorize_video():
        categorize_prompt = load_prompt(
            'prompts/categorize2.txt',
            summary=summary_result,
            predefined_topics=TOPICS
        )
        categories_output = cf.run(
            objective="Categorize the video summary into predefined topics",
            instructions=categorize_prompt,
            result_type=CategorizationResult,
            agents=[categorize_agent]
        )                    

        categories_result = categories_output.categories # list type
        return categories_result

    categories_result = categorize_video()
n
hi @aaron ward! i would just wrap the thing you want to retry in a prefect task (since prefect is already installed when you use cf)
Copy code
In [1]: from prefect.tasks import exponential_backoff

In [2]: from prefect import task

In [3]: t = task(lambda: 1/0)

In [4]: with_retries = t.with_options(retries=10, retry_delay_seconds=exponential_backoff(3))

In [5]: with_retries()
12:38:16.145 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 1/10 will start 3 second(s) from now
12:38:19.152 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 2/10 will start 6 second(s) from now
12:38:25.160 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 3/10 will start 12 second(s) from now
12:38:37.171 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 4/10 will start 24 second(s) from now
12:39:01.189 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 5/10 will start 48 second(s) from now
12:39:49.197 | INFO    | Task run '<lambda>' - Task run failed with exception: ZeroDivisionError('division by zero') - Retry 6/10 will start 96 second(s) from now
🙌 1
so you can do this inlined
task(**options)(some_callable)
syntax or the more traditional decorator syntax
Copy code
@task(retries=..., ...)
def some_callable(): ...
s
cc @Rajan Sharma
a
@Nate thanks for the help, turns out it was an issue with my VPN being blocked from the API provider
👍 1