Skip to main content

Command Palette

Search for a command to run...

Taming AWS Bedrock with Asyncio: A Deep Dive into aioboto3

How I built async Python wrappers for AWS Bedrock while contributing to a 57K+ star open-source project

Updated
β€’6 min read
Taming AWS Bedrock with Asyncio: A Deep Dive into aioboto3

The Problem: When Sync Meets Stream

Picture this: You're building a real-time data pipeline. Documents are flowing in. Users are waiting for AI-generated answers. And somewhere in the middle, your code is doing... nothing.

# The villain of our story
response = boto3.client("bedrock-runtime").converse(...)  # Blocks everything

That innocent-looking line? It's a traffic jam. While waiting for AWS Bedrock to respond, your entire pipeline freezes. Other documents pile up. Users tap their fingers. Your monitoring dashboard turns an angry shade of red.

The solution? Asyncio. But here's the catch β€” boto3 doesn't speak async. Here comesaioboto3.


What is aioboto3?

Think of aioboto3 as boto3's caffeinated cousin. Same AWS API, but it plays nice with Python's async/await syntax.

# boto3 (synchronous - blocks)
client = boto3.client("bedrock-runtime")response = client.converse(...)  # ⏸️ Waiting...
# aioboto3 (asynchronous - non-blocking)
session = aioboto3.Session()
async with session.client("bedrock-runtime") as client:
    response = await client.converse(...)  # πŸš€ Other code runs meanwhile!

The magic is in that await. While Bedrock thinks about your prompt, Python can process other documents, handle other requests, or just twiddle its computational thumbs more efficiently.

The Real-World Challenge

I recently contributed native AWS Bedrock support to Pathway, a real-time data processing framework with 57K+ GitHub stars. The challenge?

Build async wrappers for:

  1. BedrockChat β€” Conversational AI (Claude, Llama, Titan, Mistral)

  2. BedrockEmbedder β€” Vector embeddings for RAG pipelines

And they had to:

  • βœ… Never block the event loop

  • βœ… Handle retries gracefully

  • βœ… Support multiple AWS authentication methods

  • βœ… Play nicely with Pathway's streaming architecture

Let me show you how we solved it.


Lesson 1: Session Management Matters

My first attempt was... naive:

# ❌ Don't do this
async def get_embedding(text):
    session = aioboto3.Session()  # New session every call!    
    async with session.client("bedrock-runtime") as client:
        return await client.invoke_model(...)

Creating a new session per request is like buying a new car for every grocery trip. Expensive, wasteful, and your wallet (and AWS bill) will cry.

The fix? Create the session once in the constructor:

# βœ… Do this instead
class BedrockEmbedder:
    def __init__(self, region_name, **credentials):
        self._session = aioboto3.Session(
            aws_access_key_id=credentials.get("aws_access_key_id"),
            aws_secret_access_key=credentials.get("aws_secret_access_key"),
            region_name=region_name,
        )
    async def embed(self, text):
        async with self._session.client("bedrock-runtime") as client:
            return await client.invoke_model(...)

One session, reused forever. The code reviewer who caught this deserves a cookie. πŸͺ

Lesson 2: The async with Dance

Here's a pattern you'll use constantly with aioboto3:

async with self._session.client("bedrock-runtime") as client:    response = await client.converse(...)

Why async with? Two reasons:

  1. Resource cleanup: Connections are closed properly, even if errors occur

  2. Connection pooling: Under the hood, aioboto3 reuses connections efficiently

Think of it as a responsible adult cleaning up after a party. The music stops, but the house doesn't stay trashed.


Lesson 3: Bedrock's Format Quirks

AWS Bedrock doesn't speak "OpenAI". It has its own dialect:

# OpenAI format (what users expect)
messages = [
    {"role": "system", "content": "You are helpful."},
    {"role": "user", "content": "Hello!"}]
# Bedrock format (what AWS expects)
messages = [
    {"role": "user", "content": [{"text": "Hello!"}]}]
# Plus: system prompts go in a SEPARATE parameter!
system = [{"text": "You are helpful."}]

So I built a translator:

@staticmethod
def _convert_messages_to_bedrock_format(messages):
    bedrock_messages = []
    for msg in messages:
        role = msg.get("role", "user")
        content = msg.get("content", "")

        # Skip system messages (handled separately)
        if role == "system":
            continue

        # Wrap content in Bedrock's expected format
        bedrock_messages.append({
            "role": role,
            "content": [{"text": content}]
        })

    return bedrock_messages

Users write OpenAI-style messages. Bedrock gets what it wants. Everyone's happy.


Lesson 4: Model-Specific Request Bodies

Here's a fun surprise: Different Bedrock embedding models expect different JSON formats.

# Amazon Titan wants:
{"inputText": "Hello world"}
# Cohere wants:
{"texts": ["Hello world"], "input_type": "search_document"}

Same API endpoint. Different payloads. Classic AWS.

The solution? Sniff the model ID and adapt:

async def embed(self, text, model_id):
    if "titan" in model_id.lower():
        request_body = {"inputText": text}
    elif "cohere" in model_id.lower():
        request_body = {
            "texts": [text],
            "input_type": "search_document"
        }
    else:
        # Default to Titan format
        request_body = {"inputText": text}

    async with self._session.client("bedrock-runtime") as client:
        response = await client.invoke_model(
            modelId=model_id,
            body=json.dumps(request_body),
            contentType="application/json"
        )

Is it elegant? Debatable. Does it work? Absolutely.


Lesson 5: Response Parsing Gymnastics

Of course, if requests are different, responses are too:

# Titan response:
{"embedding": [0.1, 0.2, 0.3, ...]}
# Cohere response:
{"embeddings": [[0.1, 0.2, 0.3, ...]]}

Note the subtle plural and extra nesting. AWS keeps us on our toes:

# Parse based on model
if "titan" in model_id.lower():
    embedding = result.get("embedding", [])
elif "cohere" in model_id.lower():
    embeddings = result.get("embeddings", [[]])
    embedding = embeddings[0] if embeddings else []

Lesson 6: Error Handling in Async Land

Async code needs async error handling. Here's the pattern I used:

async def chat(self, messages, **kwargs):
    model_id = kwargs.get("model_id")
    if model_id is None:
        raise ValueError(
            "`model_id` is required. "
            "Provide it in constructor or function call."
        )

    try:
        async with self._session.client("bedrock-runtime") as client:
            response = await client.converse(
                modelId=model_id,
                messages=self._convert_messages(messages)
            )
    except ClientError as e:
        # Log and re-raise with context
        logger.error(f"Bedrock API error: {e}")
        raise

    return self._extract_response_text(response)

Key principle: Fail fast with clear messages. When someone passes model_id=None, don't let it bubble up as a cryptic AWS error. Tell them exactly what's wrong.


The Full Picture

Here's the final architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     BedrockChat                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  __init__()                                                 β”‚
β”‚    └─► Create aioboto3.Session (once)                       β”‚
β”‚                                                             β”‚
β”‚  __wrapped__() [async]                                      β”‚
β”‚    β”œβ”€β–Ί Convert messages to Bedrock format                   β”‚
β”‚    β”œβ”€β–Ί Extract system prompts                               β”‚
β”‚    β”œβ”€β–Ί Build inference config (temp, max_tokens, etc.)      β”‚
β”‚    β”œβ”€β–Ί async with session.client() as client:               β”‚
β”‚    β”‚       └─► await client.converse()                      β”‚
β”‚    └─► Extract and return response text                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Clean. Async. Non-blocking.


Performance: Before vs After

The impact of going async became clear during testing:

ScenarioSync (boto3)Async (aioboto3)
1 request~500ms~500ms
10 concurrent~5000ms~600ms
100 concurrent~50000ms~2000ms

For single requests? No difference. For streaming pipelines processing hundreds of documents? Night and day.


Key Takeaways

  1. Session goes in init β€” Create once, reuse always

  2. Always use async with β€” For proper resource cleanup

  3. Model-specific handling β€” AWS models speak different dialects

  4. Fail fast β€” Validate inputs before touching the network

  5. Log everything β€” Your future debugging self will thank you


Try It Yourself

The code is now part of Pathway. Install and use:

pip install pathway
from pathway.xpacks.llm import llms, embedders

# Chat with Claude
chat = llms.BedrockChat(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1")
# Embed with Titan
embedder = embedders.BedrockEmbedder(
    model_id="amazon.titan-embed-text-v2:0",
    region_name="us-east-1"
)

Or check out the Pull Request to see the full implementation and code review process.


Final Thoughts

Async programming in Python has a learning curve. Async programming with AWS services has a steeper one. But once you understand the patterns:

  • Sessions over clients

  • async with for resource management

  • Format translation layers

...you can build high-performance, non-blocking integrations that scale beautifully.

The next time your pipeline needs to call AWS Bedrock a thousand times? You'll know exactly what to do.


Happy async coding! πŸš€