Skip to content

Async Usage

RiteLLM provides async support through the acompletion function, which allows you to make non-blocking API calls and handle concurrent requests efficiently.

Basic Async Usage

The simplest way to use async mode is with the acompletion function:

import asyncio
from ritellm import acompletion

async def main():
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Hello!"}
    ]

    response = await acompletion(
        model="openai/gpt-3.5-turbo",
        messages=messages
    )

    print(response["choices"][0]["message"]["content"])

asyncio.run(main())

Async with Streaming

Enable streaming in async mode by setting stream=True:

import asyncio
from ritellm import acompletion

async def main():
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Write a short poem about Python."}
    ]

    response = await acompletion(
        model="openai/gpt-3.5-turbo",
        messages=messages,
        stream=True
    )

    # Stream the response as it arrives
    for chunk in response:
        if "choices" in chunk and len(chunk["choices"]) > 0:
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                print(content, end="", flush=True)

    print()  # New line after streaming completes

asyncio.run(main())

Concurrent Requests

One of the main benefits of async is handling multiple requests concurrently:

import asyncio
from ritellm import acompletion

async def ask_question(question: str):
    """Ask a single question and return the response."""
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": question}
    ]

    response = await acompletion(
        model="openai/gpt-3.5-turbo",
        messages=messages,
        max_tokens=100
    )

    return response["choices"][0]["message"]["content"]

async def main():
    # Define multiple questions
    questions = [
        "What is Python?",
        "What is Rust?",
        "What is async programming?"
    ]

    # Run all questions concurrently
    tasks = [ask_question(q) for q in questions]
    answers = await asyncio.gather(*tasks)

    # Print all answers
    for question, answer in zip(questions, answers):
        print(f"Q: {question}")
        print(f"A: {answer}\n")

asyncio.run(main())

Complete Example: Async Chat Application

Here's a complete example showing a simple async chat application:

import asyncio
from ritellm import acompletion

async def chat_streaming(messages: list[dict], model: str = "openai/gpt-3.5-turbo"):
    """Send a chat message and stream the response."""
    response = await acompletion(
        model=model,
        messages=messages,
        stream=True,
        temperature=0.7,
        max_tokens=500
    )

    print("Assistant: ", end="", flush=True)
    full_response = ""

    for chunk in response:
        if "choices" not in chunk or len(chunk["choices"]) == 0:
            continue

        choice = chunk["choices"][0]
        delta = choice.get("delta", {})
        content = delta.get("content", "")

        if content:
            print(content, end="", flush=True)
            full_response += content

        if choice.get("finish_reason") == "stop":
            break

    print("\n")
    return full_response

async def main():
    """Simple chat loop with async streaming."""
    messages = [
        {"role": "system", "content": "You are a helpful assistant."}
    ]

    # Simulate a conversation
    user_messages = [
        "Hello! What can you help me with?",
        "Tell me about async programming in Python.",
        "Thanks!"
    ]

    for user_msg in user_messages:
        print(f"User: {user_msg}\n")
        messages.append({"role": "user", "content": user_msg})

        # Get streaming response
        assistant_msg = await chat_streaming(messages)
        messages.append({"role": "assistant", "content": assistant_msg})

asyncio.run(main())

Best Practices

1. Use asyncio.gather() for Concurrent Requests

When you need to make multiple API calls, use asyncio.gather() to run them concurrently:

# Good: Concurrent requests
results = await asyncio.gather(
    acompletion(model="openai/gpt-3.5-turbo", messages=messages1),
    acompletion(model="openai/gpt-3.5-turbo", messages=messages2),
    acompletion(model="openai/gpt-3.5-turbo", messages=messages3)
)

# Bad: Sequential requests (slower)
result1 = await acompletion(model="openai/gpt-3.5-turbo", messages=messages1)
result2 = await acompletion(model="openai/gpt-3.5-turbo", messages=messages2)
result3 = await acompletion(model="openai/gpt-3.5-turbo", messages=messages3)

2. Handle Errors Gracefully

Wrap async calls in try-except blocks to handle failures:

async def safe_completion(messages):
    try:
        response = await acompletion(
            model="openai/gpt-3.5-turbo",
            messages=messages
        )
        return response
    except Exception as e:
        print(f"Error during completion: {e}")
        return None

3. Use Streaming for Long Responses

For better user experience with long responses, use streaming:

# Good for long responses: User sees content as it arrives
response = await acompletion(
    model="openai/gpt-3.5-turbo",
    messages=messages,
    stream=True
)

# Less ideal for long responses: User waits for entire response
response = await acompletion(
    model="openai/gpt-3.5-turbo",
    messages=messages,
    stream=False
)

4. Rate Limiting with Semaphores

Control concurrency to avoid rate limits:

async def main():
    # Limit to 5 concurrent requests
    semaphore = asyncio.Semaphore(5)

    async def limited_completion(messages):
        async with semaphore:
            return await acompletion(
                model="openai/gpt-3.5-turbo",
                messages=messages
            )

    # Create many tasks but only 5 run at once
    tasks = [limited_completion(msg) for msg in message_list]
    results = await asyncio.gather(*tasks)

Comparison: Sync vs Async

Synchronous (Blocking)

from ritellm import completion

# Sequential execution - slow for multiple requests
for i in range(10):
    response = completion(
        model="openai/gpt-3.5-turbo",
        messages=[{"role": "user", "content": f"Question {i}"}]
    )
    print(response["choices"][0]["message"]["content"])

Asynchronous (Non-blocking)

import asyncio
from ritellm import acompletion

async def main():
    # Concurrent execution - fast!
    tasks = [
        acompletion(
            model="openai/gpt-3.5-turbo",
            messages=[{"role": "user", "content": f"Question {i}"}]
        )
        for i in range(10)
    ]

    results = await asyncio.gather(*tasks)
    for result in results:
        print(result["choices"][0]["message"]["content"])

asyncio.run(main())